Streaming Deduplication in Delta Lake: A Hands-On Approach
In modern data architectures, managing real-time data streams effectively is essential for ensuring data integrity and accuracy.
A common challenge is dealing with duplicate records, which can lead to skewed analytics and wasted resources.
This blog post focuses on a practical solution to handle streaming deduplication within the silver layer of a Delta Lake architecture, ensuring that the Bronze layer retains raw data, while the Silver layer holds deduplicated records for accurate downstream processing.
Objective
Our goal is to remove duplicate records from streaming data in the orders Silver table using two key methods:
- The
dropDuplicates()
function. - An insert-only merge strategy.
By applying deduplication at the Silver level, we maintain the integrity of the entire dataset in the Bronze layer for historical purposes.
Key Concepts
Bronze vs. Silver Deduplication:
- The Bronze layer stores raw, unprocessed data, which includes duplicates. This ensures no data is lost during ingestion and minimizes latency.
- Deduplication happens in the Silver layer, where the data is cleaned up and duplicates are removed before any further processing occurs.
Identifying Duplicates in Static Data:
- For static data, the
dropDuplicates()
function is used to eliminate duplicate records based on specific columns. - When applied to the
orders
topic, this function revealed that around 20% of the records were duplicates.
Static Deduplication Example:
from pyspark.sql import functions as F
json_schema = "order_id STRING, order_timestamp Timestamp, customer_id STRING, quantity BIGINT, total BIGINT, books ARRAY<STRUCT<book_id STRING, quantity BIGINT, subtotal BIGINT>>"
batch_total = (spark.read
.table("bronze")
.filter("topic = 'orders'")
.select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
.select("v.*")
.dropDuplicates(["order_id", "order_timestamp"])
.count()
)
print(batch_total)
Deduplication in Structured Streaming
When dealing with streaming data, deduplication becomes more complex due to the continuous nature of micro-batches.
To handle this, Spark maintains state information for each microbatch, allowing it to track which records have already been processed and prevent reprocessing of duplicates.
Watermarking
Watermarking is used to limit the amount of state Spark needs to manage. It sets a time window (e.g., 30 seconds) during which duplicates are tracked, after which the state information is discarded, reducing overhead.
Streaming Deduplication Example:
deduped_df = (spark.readStream
.table("bronze")
.filter("topic = 'orders'")
.select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
.select("v.*")
.withWatermark("order_timestamp", "30 seconds")
.dropDuplicates(["order_id", "order_timestamp"])
)
In this example, duplicates within each new microbatch are removed using the dropDuplicates()
function, with a watermark ensuring state is discarded after 30 seconds.
Insert-Only Merge for Deduplication
In real-time deduplication, we can use an insert-only merge strategy. This method inserts only the unique records from the Bronze layer into the Silver table by matching records on unique keys and ensuring that only new, non-duplicate records are added.
Custom Merge Function Example:
def upsert_data(microBatchDF, batch):
microBatchDF.createOrReplaceTempView("tv_orders_microbatch")
sql_query = """
MERGE INTO orders_silver a
USING tv_orders_microbatch b
ON a.order_id = b.order_id AND a.order_timestamp = b.order_timestamp
WHEN NOT MATCHED THEN INSERT *
"""
microBatchDF.sparkSession.sql(sql_query)
This function:
- Stores the incoming batch records in a temporary view.
- Uses a
MERGE
query to insert only new, unique records into the Silver table.
Streaming Write with foreachBatch()
To apply this custom deduplication logic on streaming data, we use the foreachBatch()
method. This method enables us to process each microbatch with the deduplication logic and insert unique records into the Silver table.
Example:
query = (deduped_df.writeStream
.foreachBatch(upsert_data)
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/orders_silver")
.trigger(availableNow=True)
.start()
)
query.awaitTermination()
This approach ensures that each microbatch of the stream is processed, duplicates are removed, and only unique records are inserted into the Silver table.
Step-by-Step: Streaming Deduplication
Initial Setup:
- Ensure the Bronze table has raw data (including duplicates).
- Use
dropDuplicates()
to identify and remove duplicates in the Silver layer.
Streaming Read and Deduplication:
- Set up a streaming read from the Bronze table using Structured Streaming.
- Apply watermarking to manage state efficiently over time and limit the tracking of duplicates.
Define Insert-Only Merge:
- Create a custom function to perform an insert-only merge into the Silver table. This ensures that only new records are inserted, and duplicates are filtered out.
Apply foreachBatch()
for Streaming Write:
- Use the
foreachBatch()
method to apply the deduplication logic for each microbatch. This ensures that the deduplication process is applied in real-time.
- Validation:
- Validate the number of unique records in the Silver table after running the streaming job. The results from batch deduplication and streaming deduplication should be consistent.
Conclusion
Streaming deduplication is a crucial step in building real-time, reliable data pipelines. By applying deduplication in the Silver layer, we ensure that:
- The Bronze layer retains raw data with duplicates, allowing for historical completeness.
- The Silver layer contains only deduplicated, clean records ready for further processing.
This balance between data integrity and efficiency is essential, especially in scenarios where duplicates are common in real-time data ingestion systems. By leveraging Spark Structured Streaming, dropDuplicates()
, watermarking, and insert-only merges, you can implement a scalable solution for streaming deduplication in your Delta Lake architecture.
Stay tuned for future posts where we dive deeper into advanced streaming operations and state management in real-time data processing!