SQL for Pipeline Monitoring — Detecting Stale Data, Late Arrivals, and Anomalous Loads

A data pipeline is not done when the first load succeeds. In production, pipelines fail silently all the time — a partition stops updating, row counts drop unexpectedly, timestamps stop moving forward. Without active monitoring, these problems go unnoticed until a dashboard shows wrong numbers and someone in finance calls.

This tutorial covers the SQL queries that data engineers use to monitor pipeline health continuously — detecting problems automatically before users notice them.


The Monitoring Dataset

-- Pipeline run metadata — your pipeline writes this after every successful run
CREATE TABLE pipeline.run_log (
    run_id        BIGINT GENERATED ALWAYS AS IDENTITY,
    pipeline_name VARCHAR(100),
    target_table  VARCHAR(100),
    batch_date    DATE,
    rows_loaded   BIGINT,
    bytes_loaded  BIGINT,
    start_time    TIMESTAMP,
    end_time      TIMESTAMP,
    status        VARCHAR(20),    -- 'success', 'failed', 'partial'
    error_message VARCHAR(500)
);

-- Simulate 14 days of run history
INSERT INTO pipeline.run_log (pipeline_name, target_table, batch_date, rows_loaded, bytes_loaded, start_time, end_time, status) VALUES
('orders_pipeline', 'silver.orders', '2024-03-01', 45230, 15200000, '2024-03-01 02:00:00', '2024-03-01 02:14:00', 'success'),
('orders_pipeline', 'silver.orders', '2024-03-02', 43100, 14800000, '2024-03-02 02:00:00', '2024-03-02 02:13:00', 'success'),
('orders_pipeline', 'silver.orders', '2024-03-03', 44800, 15100000, '2024-03-03 02:00:00', '2024-03-03 02:15:00', 'success'),
('orders_pipeline', 'silver.orders', '2024-03-04', 250,   85000,    '2024-03-04 02:00:00', '2024-03-04 02:01:00', 'success'),  -- suspiciously low
('orders_pipeline', 'silver.orders', '2024-03-05', 44200, 14900000, '2024-03-05 02:00:00', '2024-03-05 02:14:00', 'success'),
('orders_pipeline', 'silver.orders', '2024-03-06', 46100, 15500000, '2024-03-06 02:00:00', '2024-03-06 02:16:00', 'success'),
('orders_pipeline', 'silver.orders', '2024-03-07', 38900, 13100000, '2024-03-07 02:00:00', '2024-03-07 02:12:00', 'success'),
('orders_pipeline', 'silver.orders', '2024-03-08', 45500, 15300000, '2024-03-08 02:00:00', '2024-03-08 02:14:00', 'success');
-- No run on 2024-03-09 or beyond -- pipeline stopped running

Check 1 — Detect Missing Pipeline Runs (Stale Partitions)

The most critical check: is the pipeline still running every day? A gap means data consumers are looking at stale information.

WITH date_spine AS (
    SELECT CURRENT_DATE - (n.n || ' days')::INTERVAL AS expected_date
    FROM GENERATE_SERIES(0, 13) AS n(n)
),
run_status AS (
    SELECT
        batch_date,
        status,
        rows_loaded
    FROM pipeline.run_log
    WHERE pipeline_name = 'orders_pipeline'
      AND status = 'success'
)
SELECT
    ds.expected_date,
    CASE
        WHEN rs.batch_date IS NOT NULL THEN 'OK'
        ELSE 'MISSING'
    END AS run_status,
    rs.rows_loaded
FROM date_spine ds
LEFT JOIN run_status rs ON ds.expected_date = rs.batch_date
ORDER BY ds.expected_date DESC;

Output:

expected_daterun_statusrows_loaded
2024-03-14MISSINGNULL
2024-03-13MISSINGNULL
2024-03-12MISSINGNULL
2024-03-11MISSINGNULL
2024-03-10MISSINGNULL
2024-03-09MISSINGNULL
2024-03-08OK45500
2024-03-07OK38900

6 consecutive days of MISSING runs. Your pipeline stopped on March 9th.

Alert query — fire when last successful run was more than 1 day ago:

SELECT
    pipeline_name,
    MAX(batch_date) AS last_successful_run,
    CURRENT_DATE - MAX(batch_date) AS days_since_last_run
FROM pipeline.run_log
WHERE status = 'success'
GROUP BY pipeline_name
HAVING CURRENT_DATE - MAX(batch_date) > 1
ORDER BY days_since_last_run DESC;

Check 2 — Anomalous Row Counts (Statistical Detection)

March 4th loaded only 250 rows when the pipeline normally loads ~44,000. This is a silent failure — the pipeline reported success but the data was clearly wrong.

WITH stats AS (
    SELECT
        pipeline_name,
        AVG(rows_loaded)    AS avg_rows,
        STDDEV(rows_loaded) AS stddev_rows,
        MIN(rows_loaded)    AS min_rows,
        MAX(rows_loaded)    AS max_rows
    FROM pipeline.run_log
    WHERE status = 'success'
      AND batch_date >= CURRENT_DATE - 30   -- use last 30 days as baseline
    GROUP BY pipeline_name
)
SELECT
    r.pipeline_name,
    r.batch_date,
    r.rows_loaded,
    ROUND(s.avg_rows)    AS typical_rows,
    ROUND(s.stddev_rows) AS stddev_rows,
    CASE
        WHEN r.rows_loaded < s.avg_rows - 3 * s.stddev_rows THEN 'ANOMALY: too few rows'
        WHEN r.rows_loaded > s.avg_rows + 3 * s.stddev_rows THEN 'ANOMALY: too many rows'
        ELSE 'OK'
    END AS anomaly_check
FROM pipeline.run_log r
JOIN stats s ON r.pipeline_name = s.pipeline_name
WHERE r.batch_date >= CURRENT_DATE - 7
ORDER BY r.batch_date DESC;

Output:

pipeline_namebatch_daterows_loadedtypical_rowsanomaly_check
orders_pipeline2024-03-084550043985OK
orders_pipeline2024-03-073890043985OK
orders_pipeline2024-03-064610043985OK
orders_pipeline2024-03-054420043985OK
orders_pipeline2024-03-0425043985ANOMALY: too few rows

March 4th flagged automatically. 3 standard deviations is the standard threshold for anomaly detection — adjust based on how noisy your pipeline is.


Check 3 — Detect Stale Data in the Target Table

Even when the pipeline runs, the data inside the target table might not be fresh. This checks the actual max timestamp in the Silver table.

SELECT
    'silver.orders'                           AS table_name,
    MAX(updated_at)                           AS latest_record_ts,
    CURRENT_TIMESTAMP                         AS check_time,
    CURRENT_TIMESTAMP - MAX(updated_at)       AS data_age,
    CASE
        WHEN CURRENT_TIMESTAMP - MAX(updated_at) > INTERVAL '25 hours'
        THEN 'STALE: data older than 25 hours'
        ELSE 'FRESH'
    END AS freshness_status
FROM silver.orders;

Output when pipeline is failing:

table_namelatest_record_tsdata_agefreshness_status
silver.orders2024-03-08 02:14:006 days 03:15:00STALE: data older than 25 hours

Run this check across all your key tables:

-- Multi-table freshness check
SELECT 'silver.orders'   AS tbl, MAX(updated_at) AS latest FROM silver.orders   UNION ALL
SELECT 'silver.events'   AS tbl, MAX(event_ts)   AS latest FROM silver.events   UNION ALL
SELECT 'silver.customers'AS tbl, MAX(updated_at) AS latest FROM silver.customers
ORDER BY latest ASC;   -- oldest table at the top = first problem to investigate

Check 4 — Pipeline Duration Trend (Detecting Slowdowns)

A pipeline that runs in 14 minutes today but takes 90 minutes next month needs attention. Catching the trend early prevents SLA breaches.

SELECT
    batch_date,
    rows_loaded,
    EXTRACT(EPOCH FROM (end_time - start_time)) / 60 AS duration_minutes,
    rows_loaded / NULLIF(EXTRACT(EPOCH FROM (end_time - start_time)), 0) AS rows_per_second,
    AVG(EXTRACT(EPOCH FROM (end_time - start_time)) / 60)
        OVER (ORDER BY batch_date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS moving_avg_duration_7d
FROM pipeline.run_log
WHERE pipeline_name = 'orders_pipeline'
  AND status = 'success'
ORDER BY batch_date;

Output:

batch_daterows_loadedduration_minutesrows_per_secondmoving_avg_duration_7d
2024-03-014523014.053.814.0
2024-03-024310013.055.313.5
2024-03-034480015.049.814.0
2024-03-042501.04.210.8
2024-03-054420014.052.611.4
2024-03-064610016.048.012.2
2024-03-073890012.054.012.1
2024-03-084550014.054.212.5

March 4th shows 4.2 rows/second vs the usual 50+ — another signal that the run was anomalous, even though it completed without error.


Check 5 — Partition Completeness by Hour

For pipelines that load hourly partitions, verify that every expected hour arrived.

WITH expected_hours AS (
    SELECT
        DATE_TRUNC('hour', CURRENT_TIMESTAMP - (n || ' hours')::INTERVAL) AS expected_hour
    FROM GENERATE_SERIES(1, 24) AS n
)
SELECT
    eh.expected_hour,
    COUNT(o.order_id) AS row_count,
    CASE WHEN COUNT(o.order_id) = 0 THEN 'MISSING' ELSE 'OK' END AS status
FROM expected_hours eh
LEFT JOIN silver.orders o
    ON DATE_TRUNC('hour', o.loaded_at) = eh.expected_hour
GROUP BY eh.expected_hour
ORDER BY eh.expected_hour DESC;

This shows which hourly partitions have data and which are empty — useful for near-real-time pipelines where a 2-hour gap needs immediate investigation.


Check 6 — Late-Arriving Data Detection

Some rows arrive with event timestamps from previous days — common in mobile apps that batch events, or systems with upload delays. This check shows how much data is arriving late.

SELECT
    loaded_date,
    event_date,
    loaded_date - event_date AS days_late,
    COUNT(*) AS row_count,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (PARTITION BY loaded_date), 2) AS pct_of_daily_load
FROM (
    SELECT
        DATE(loaded_at) AS loaded_date,
        event_date
    FROM silver.events
    WHERE loaded_at >= CURRENT_DATE - 7
) sub
GROUP BY loaded_date, event_date
ORDER BY loaded_date DESC, days_late;

Output:

loaded_dateevent_datedays_laterow_countpct_of_daily_load
2024-03-082024-03-0804120090.5%
2024-03-082024-03-07131006.8%
2024-03-082024-03-0629002.0%
2024-03-082024-03-0533000.7%

90.5% of data arrives same-day. 6.8% arrives 1 day late — this informs your lookback buffer decision. If 3% arrives 3 days late, set your watermark lookback to at least 4 days.


Building a Monitoring Dashboard Table

Combine all checks into a daily monitoring table that your alerting system reads:

INSERT INTO pipeline.monitoring_report (
    check_date, pipeline_name, check_name, status, detail, created_at
)

-- Check 1: Missing run
SELECT
    CURRENT_DATE,
    'orders_pipeline',
    'last_run_freshness',
    CASE WHEN CURRENT_DATE - MAX(batch_date) > 1 THEN 'FAIL' ELSE 'PASS' END,
    'Last run: ' || MAX(batch_date)::VARCHAR || ', days ago: ' || (CURRENT_DATE - MAX(batch_date))::VARCHAR,
    CURRENT_TIMESTAMP
FROM pipeline.run_log
WHERE pipeline_name = 'orders_pipeline' AND status = 'success'

UNION ALL

-- Check 2: Row count anomaly (latest run vs 7-day average)
SELECT
    CURRENT_DATE,
    'orders_pipeline',
    'row_count_anomaly',
    CASE
        WHEN ABS(latest.rows_loaded - hist.avg_rows) > 3 * hist.stddev_rows
        THEN 'FAIL'
        ELSE 'PASS'
    END,
    'Latest: ' || latest.rows_loaded::VARCHAR || ', Avg: ' || ROUND(hist.avg_rows)::VARCHAR,
    CURRENT_TIMESTAMP
FROM (
    SELECT rows_loaded FROM pipeline.run_log
    WHERE pipeline_name = 'orders_pipeline' AND status = 'success'
    ORDER BY batch_date DESC LIMIT 1
) latest
CROSS JOIN (
    SELECT AVG(rows_loaded) AS avg_rows, STDDEV(rows_loaded) AS stddev_rows
    FROM pipeline.run_log
    WHERE pipeline_name = 'orders_pipeline' AND status = 'success'
      AND batch_date >= CURRENT_DATE - 30
) hist

UNION ALL

-- Check 3: Data freshness
SELECT
    CURRENT_DATE,
    'orders_pipeline',
    'data_freshness',
    CASE
        WHEN CURRENT_TIMESTAMP - MAX(updated_at) > INTERVAL '25 hours' THEN 'FAIL'
        ELSE 'PASS'
    END,
    'Latest record: ' || MAX(updated_at)::VARCHAR,
    CURRENT_TIMESTAMP
FROM silver.orders;

Query this table from Grafana, Apache Superset, or any alerting tool to get a real-time view of pipeline health.


Common Mistakes

Mistake 1 — Monitoring the run log but not the target table

A pipeline can log “success” but load zero rows or wrong data. Always check the actual target table, not just the run log.

Mistake 2 — Fixed thresholds instead of dynamic baselines

WHERE rows_loaded < 40000 breaks when seasonality causes legitimate dips (weekends, holidays). Use statistical thresholds (mean ± 3 std) instead.

Mistake 3 — Not storing monitoring history

Without history, you cannot answer “how long has this been broken?” or “what was the trend before it broke?” Always INSERT monitoring results into a log table rather than SELECT-only queries.


What to Learn Next

Monitoring tells you when something is wrong. The next tutorial covers SQL patterns for debugging broken pipelines — how to trace a specific row from Gold back through Silver to Bronze, and how to use SQL to reproduce and explain unexpected results.

Leave a Comment