Implementing Slowly Changing Dimension Type 2 (SCD2) for Real-Time Price Tracking in Delta Lake


In today’s data-driven world, businesses need to track historical changes in critical data while maintaining an accessible record of the current data state.

This blog post walks through implementing a Type 2 Slowly Changing Dimension (SCD2) table for tracking price changes in book sales, designed for a data lakehouse architecture using Delta Lake.

Here, we’ll process data streams in real-time from a Bronze layer and store processed records in the Silver layer. The end result is a table that reflects both historical and current prices for each book, ideal for analytical use cases such as price trend analysis and financial calculations.

Problem Statement

In this scenario, we need to track historical price changes for books.

The goal is to:

  1. Maintain a historical record of all price modifications for each book.
  2. Ensure only one active record per book ID, representing the current price.
  3. Real-time processing of updates to capture new price changes immediately.

We’ll be implementing SCD Type 2 using Delta Lake’s MERGE command to handle these requirements efficiently.

Data Flow Overview

  1. Bronze Layer: Ingests new or updated book data from an external source in real-time.
  2. Silver Layer (books_silver table): Stores the SCD2-compliant records, tracking all historical price changes.
  3. current_books Table: A simplified table view in the Silver layer, showing only the latest price record for each book.

Step-by-Step Guide to Implementing SCD Type 2

Step 0: Create & populate the books_silver Table

The books_silver table is the primary Silver layer table for storing book price changes. Each new update will result in a new row for that book, with the previous row marked as inactive.

CREATE TABLE IF NOT EXISTS books_silver (
    book_id STRING,            -- Unique identifier for each book
    title STRING,              -- Title of the book
    author STRING,             -- Author of the book
    price DOUBLE,              -- Price of the book
    current BOOLEAN,           -- Indicates if this record is the latest (current              = true) or historical (current = false)
    effective_date TIMESTAMP,  -- Date and time the record became effective
    end_date TIMESTAMP         -- Date and time the record was superseded (NULL if current = true)
);

-- Insert initial records for books in books_silver table
INSERT INTO books_silver (book_id, title, author, price, current, effective_date, end_date)
VALUES
    ('B01', 'Data Engineering 101', 'John Doe', 45.99, true, '2023-01-01', NULL),
    ('B02', 'Machine Learning Guide', 'Jane Smith', 59.99, true, '2023-01-01', NULL),
    ('B03', 'Introduction to Data Science', 'Alice Brown', 39.99, true, '2023-01-01', NULL),
    ('B04', 'Advanced Big Data Analytics', 'Bob White', 79.99, true, '2023-01-01', NULL),
    ('B05', 'SQL for Data Analysis', 'Mary Green', 29.99, true, '2023-01-01', NULL);
book_id title author price current effective_date end_date
B01 Data Engineering 101 John Doe 45.99 true 2023-01-01 NULL
B02 Machine Learning Guide Jane Smith 59.99 true 2023-01-01 NULL
B03 Introduction to Data Science Alice Brown 39.99 true 2023-01-01 NULL
B04 Advanced Big Data Analytics Bob White 79.99 true 2023-01-01 NULL
B05 SQL for Data Analysis Mary Green 29.99 true 2023-01-01 NULL

Explanation of Columns:

  • book_id, title, and author store book details.
  • price holds the current or historical price of the book.
  • current is a boolean flag (true if active, false if historical).
  • effective_date and end_date track the time range when each record was valid.

Step 1: Define the Schema for Incoming Data

We define a schema to parse the incoming data from the Bronze layer. This data will come in JSON format and includes the latest book information from external sources.

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Define the schema for Bronze layer stream data
book_schema = StructType([
    StructField("book_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("author", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("updated", TimestampType(), True)
])

Step 2: Define the SCD Type 2 Upsert Logic

The following upsert function, type2_upsert, applies SCD2 logic by using Delta Lake’s MERGE command. This function:

  1. Updates the current record by setting current = false and populating end_date.
  2. Inserts a new record with current = true and no end_date, marking it as the active version.
def type2_upsert(microBatchDF, batchId):
    # Create a temporary view for the micro-batch
    microBatchDF.createOrReplaceTempView("updates")
    
    # SQL MERGE query to apply SCD2 logic
    merge_query = """
        MERGE INTO books_silver AS target
        USING (
            SELECT updates.book_id as merge_key, updates.*
            FROM updates
            UNION ALL
            SELECT NULL as merge_key, updates.*
            FROM updates
            JOIN books_silver ON updates.book_id = books_silver.book_id
            WHERE books_silver.current = true 
                  AND updates.price <> books_silver.price
        ) staged_updates
        ON target.book_id = staged_updates.merge_key
        
        -- Update the existing record as inactive
        WHEN MATCHED AND target.current = true 
             AND target.price <> staged_updates.price THEN
          UPDATE SET current = false, end_date = staged_updates.updated
        
        -- Insert new record as the current version with updated price
        WHEN NOT MATCHED THEN
          INSERT (book_id, title, author, price, current, effective_date, end_date)
          VALUES (staged_updates.book_id, staged_updates.title, staged_updates.author, 
                  staged_updates.price, true, staged_updates.updated, NULL)
    """
    
    # Execute the merge to update SCD2 records
    microBatchDF.sparkSession.sql(merge_query)

Step 3: Read Stream from Bronze Layer and Apply Upsert Logic

We set up a streaming query to read from the Bronze layer, apply the type2_upsert function on each micro-batch, and write the results to the books_silver table.

def process_books():
    # Read streaming data from the Bronze layer
    query = (
        spark.readStream
             .table("bronze")  # Source Bronze layer table
             .filter("topic = 'books'")  # Filter for book-related data
             .select(F.from_json(F.col("value").cast("string"), book_schema).alias("v"))  # Parse JSON
             .select("v.*")  # Extract fields
             .writeStream
             .foreachBatch(type2_upsert)  # Apply SCD Type 2 upsert function
             .option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/books_silver")  # Set checkpoint
             .trigger(availableNow=True)  # Trigger option for real-time updates
             .start()
    )
    
    query.awaitTermination()  # Wait for streaming to complete

Step 4: Query the Updated Silver Table

Once the streaming query processes data, we can view the latest state of the books_silver table with:

books_df = spark.read.table("books_silver").orderBy("book_id", "effective_date")
display(books_df)

Step 5: Filter for Current Records in current_books

To create a view with only the latest price for each book, we can filter records where current = true into a current_books table.

CREATE OR REPLACE TABLE current_books AS
SELECT book_id, title, author, price
FROM books_silver
WHERE current = true;

To verify, query the current_books table:

SELECT * FROM current_books ORDER BY book_id;

Conclusion

By implementing SCD2 in a Delta Lakehouse, this solution enables:

  • Historical Tracking: The books_silver table retains all historical price records.
  • Real-Time Updates: Streaming data is processed and tracked in real-time for up-to-date insights.
  • Efficient Current Data Access: The current_books table allows easy access to the latest data without querying historical records.

This setup is ideal for applications requiring price trend analysis, accurate financial calculations, and robust historical data tracking. With the SCD2 architecture, businesses can ensure both data accuracy and auditability, making it a powerful addition to any data lakehouse.

Leave a Reply

Your email address will not be published. Required fields are marked *