Implementing Slowly Changing Dimension Type 2 (SCD2) for Real-Time Price Tracking in Delta Lake
In today’s data-driven world, businesses need to track historical changes in critical data while maintaining an accessible record of the current data state.
This blog post walks through implementing a Type 2 Slowly Changing Dimension (SCD2) table for tracking price changes in book sales, designed for a data lakehouse architecture using Delta Lake.
Here, we’ll process data streams in real-time from a Bronze layer and store processed records in the Silver layer. The end result is a table that reflects both historical and current prices for each book, ideal for analytical use cases such as price trend analysis and financial calculations.
Problem Statement
In this scenario, we need to track historical price changes for books.
The goal is to:
- Maintain a historical record of all price modifications for each book.
- Ensure only one active record per book ID, representing the current price.
- Real-time processing of updates to capture new price changes immediately.
We’ll be implementing SCD Type 2 using Delta Lake’s MERGE
command to handle these requirements efficiently.
Data Flow Overview
- Bronze Layer: Ingests new or updated book data from an external source in real-time.
- Silver Layer (
books_silver
table): Stores the SCD2-compliant records, tracking all historical price changes. current_books
Table: A simplified table view in the Silver layer, showing only the latest price record for each book.
Step-by-Step Guide to Implementing SCD Type 2
Step 0: Create & populate the books_silver
Table
The books_silver
table is the primary Silver layer table for storing book price changes. Each new update will result in a new row for that book, with the previous row marked as inactive.
CREATE TABLE IF NOT EXISTS books_silver (
book_id STRING, -- Unique identifier for each book
title STRING, -- Title of the book
author STRING, -- Author of the book
price DOUBLE, -- Price of the book
current BOOLEAN, -- Indicates if this record is the latest (current = true) or historical (current = false)
effective_date TIMESTAMP, -- Date and time the record became effective
end_date TIMESTAMP -- Date and time the record was superseded (NULL if current = true)
);
-- Insert initial records for books in books_silver table
INSERT INTO books_silver (book_id, title, author, price, current, effective_date, end_date)
VALUES
('B01', 'Data Engineering 101', 'John Doe', 45.99, true, '2023-01-01', NULL),
('B02', 'Machine Learning Guide', 'Jane Smith', 59.99, true, '2023-01-01', NULL),
('B03', 'Introduction to Data Science', 'Alice Brown', 39.99, true, '2023-01-01', NULL),
('B04', 'Advanced Big Data Analytics', 'Bob White', 79.99, true, '2023-01-01', NULL),
('B05', 'SQL for Data Analysis', 'Mary Green', 29.99, true, '2023-01-01', NULL);
book_id | title | author | price | current | effective_date | end_date |
---|---|---|---|---|---|---|
B01 | Data Engineering 101 | John Doe | 45.99 | true | 2023-01-01 | NULL |
B02 | Machine Learning Guide | Jane Smith | 59.99 | true | 2023-01-01 | NULL |
B03 | Introduction to Data Science | Alice Brown | 39.99 | true | 2023-01-01 | NULL |
B04 | Advanced Big Data Analytics | Bob White | 79.99 | true | 2023-01-01 | NULL |
B05 | SQL for Data Analysis | Mary Green | 29.99 | true | 2023-01-01 | NULL |
Explanation of Columns:
book_id
,title
, andauthor
store book details.price
holds the current or historical price of the book.current
is a boolean flag (true
if active,false
if historical).effective_date
andend_date
track the time range when each record was valid.
Step 1: Define the Schema for Incoming Data
We define a schema to parse the incoming data from the Bronze layer. This data will come in JSON format and includes the latest book information from external sources.
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define the schema for Bronze layer stream data
book_schema = StructType([
StructField("book_id", StringType(), True),
StructField("title", StringType(), True),
StructField("author", StringType(), True),
StructField("price", DoubleType(), True),
StructField("updated", TimestampType(), True)
])
Step 2: Define the SCD Type 2 Upsert Logic
The following upsert function, type2_upsert
, applies SCD2 logic by using Delta Lake’s MERGE
command. This function:
- Updates the current record by setting
current = false
and populatingend_date
. - Inserts a new record with
current = true
and noend_date
, marking it as the active version.
def type2_upsert(microBatchDF, batchId):
# Create a temporary view for the micro-batch
microBatchDF.createOrReplaceTempView("updates")
# SQL MERGE query to apply SCD2 logic
merge_query = """
MERGE INTO books_silver AS target
USING (
SELECT updates.book_id as merge_key, updates.*
FROM updates
UNION ALL
SELECT NULL as merge_key, updates.*
FROM updates
JOIN books_silver ON updates.book_id = books_silver.book_id
WHERE books_silver.current = true
AND updates.price <> books_silver.price
) staged_updates
ON target.book_id = staged_updates.merge_key
-- Update the existing record as inactive
WHEN MATCHED AND target.current = true
AND target.price <> staged_updates.price THEN
UPDATE SET current = false, end_date = staged_updates.updated
-- Insert new record as the current version with updated price
WHEN NOT MATCHED THEN
INSERT (book_id, title, author, price, current, effective_date, end_date)
VALUES (staged_updates.book_id, staged_updates.title, staged_updates.author,
staged_updates.price, true, staged_updates.updated, NULL)
"""
# Execute the merge to update SCD2 records
microBatchDF.sparkSession.sql(merge_query)
Step 3: Read Stream from Bronze Layer and Apply Upsert Logic
We set up a streaming query to read from the Bronze layer, apply the type2_upsert
function on each micro-batch, and write the results to the books_silver
table.
def process_books():
# Read streaming data from the Bronze layer
query = (
spark.readStream
.table("bronze") # Source Bronze layer table
.filter("topic = 'books'") # Filter for book-related data
.select(F.from_json(F.col("value").cast("string"), book_schema).alias("v")) # Parse JSON
.select("v.*") # Extract fields
.writeStream
.foreachBatch(type2_upsert) # Apply SCD Type 2 upsert function
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/books_silver") # Set checkpoint
.trigger(availableNow=True) # Trigger option for real-time updates
.start()
)
query.awaitTermination() # Wait for streaming to complete
Step 4: Query the Updated Silver Table
Once the streaming query processes data, we can view the latest state of the books_silver
table with:
books_df = spark.read.table("books_silver").orderBy("book_id", "effective_date")
display(books_df)
Step 5: Filter for Current Records in current_books
To create a view with only the latest price for each book, we can filter records where current = true
into a current_books
table.
CREATE OR REPLACE TABLE current_books AS
SELECT book_id, title, author, price
FROM books_silver
WHERE current = true;
To verify, query the current_books
table:
SELECT * FROM current_books ORDER BY book_id;
Conclusion
By implementing SCD2 in a Delta Lakehouse, this solution enables:
- Historical Tracking: The
books_silver
table retains all historical price records. - Real-Time Updates: Streaming data is processed and tracked in real-time for up-to-date insights.
- Efficient Current Data Access: The
current_books
table allows easy access to the latest data without querying historical records.
This setup is ideal for applications requiring price trend analysis, accurate financial calculations, and robust historical data tracking. With the SCD2 architecture, businesses can ensure both data accuracy and auditability, making it a powerful addition to any data lakehouse.