Operational Health: Auditing data freshness with dlt metadata
Aman Gupta,
Data Engineer
Introduction
A "Success" exit code only tells you the pipeline ran. It says nothing about whether the data inside is up to date.
The fix: Use load_id to join _dlt_loads with your source table and compare timestamps. If they're close, the source is fresh. If they're hours apart, the source stopped updating and the pipeline ran green on stale data.
Here's how to build a freshness check using dlt metadata.

The setup: a source with a native timestamp
We use a mock lemonade stand as the data source. An is_fresh toggle controls whether the data is current or 48 hours old, simulating a source that has silently stopped updating.
import dlt
import duckdb
from datetime import datetime, timedelta, timezone
# Defining the data source
@dlt.resource(write_disposition="append")
def lemonade_stand(is_fresh: bool = True):
# If fresh, data is from "now". If not, it's from 2 days ago.
if is_fresh:
timestamp = datetime.now()
else:
timestamp = datetime.now() - timedelta(days=2)
yield [
{"id": 1, "flavor": "Lemon", "amount": 5.00, "sold_at": timestamp},
{"id": 2, "flavor": "Lime", "amount": 4.50, "sold_at": timestamp},
{"id": 3, "flavor": "Berry", "amount": 6.00, "sold_at": timestamp}
]
# Define the pipeline
pipeline = dlt.pipeline(
pipeline_name="operational_health",
destination="duckdb",
dataset_name="freshness_dataset",
)
info = pipeline.run(lemonade_stand(is_fresh=True))
print(info)Step 1: Check when the pipeline ran - not what it loaded
Query _dlt_loads for the latest successful load (status = 0). The inserted_at column tells you when it ran.
def check_pipeline_freshness(pipeline):
last_load = pipeline.dataset()._dlt_loads.df().query("status == 0")['inserted_at'].max()
if last_load is None:
print("❌ No successful syncs found.")
return
# Compare inserted_at against current UTC time to determine staleness
age = (pd.Timestamp.now(tz='UTC') - last_load).total_seconds()
if age < 3600:
print(f"✅ FRESH: Updated {age/60:.0f} mins ago")
else:
print(f"❌ STALE: Updated {age/3600:.1f} hours ago")Step 2: Load stale data and watch the pipeline check miss it
Run the pipeline again with is_fresh=False.
pipeline.run(lemonade_stand(is_fresh=False))
print("Run 2 (Stale Data) completed.")
print("--- MONITOR REPORT ---")
print("1. PIPELINE CHECK:")
check_pipeline_freshness(pipeline)
The pipeline check reports fresh. It ran recently - that part is true. But the data inside is 48 hours old.
Step 3: Check data freshness by joining on 'load_id'
Get the latest load_id from _dlt_loads, match it to the rows in your source table, and compare the source's own timestamp (sold_at) against when dlt (inserted_at) wrote it.
# 1. Get latest load_id and inserted_at from the `_dlt_loads`
latest_load = (pipeline.dataset()._dlt_loads.df()
.query("status == 0")
.sort_values("inserted_at")
.iloc[-1])
load_id = latest_load['load_id']
inserted_at = latest_load['inserted_at']
# 2. Get the "sold at" value for the load_id that belongs to the latest load
latest_sale = (pipeline.dataset().sales.df()
.query("_dlt_load_id == @load_id")
.sort_values("sold_at")
.iloc[-1])
sold_at = latest_sale['sold_at']
# Quick Staleness Check
is_stale = (inserted_at - sold_at).total_seconds() > 3600
print(f"Verdict: {'❌ STALE' if is_stale else '✅ FRESH'}")
The takeaway
Pipeline status and data freshness are two separate signals. A pipeline can run successfully on data that stopped updating hours ago and _dlt_loads will show green either way.
The difference is what you join on. inserted_at alone tells you when dlt ran. inserted_at minus the source's native timestamp tells you whether the source is still alive.
Treat them separately.
Full demo: Operational Health: Freshness via dlt loads
Further reading: the data quality lifecycle