11 Pythonic Data Quality Recipes for every day
Aman Gupta,
Data Engineer
Data quality isn’t a single type of test or check or contract, it’s an entire bunch of things that ensure that our data is clean, our development is safe and our assumptions are true.
The Data Quality Lifecycle breaks this into three checkpoints (in-flight, staging, destination) and five pillars (structural, semantic, uniqueness, privacy, operational). But knowing the framework isn't the same as implementing it.

This post is a small recipe book. Each recipe is a copy-paste pattern for a specific data quality job:
- Recipe 1: Data quality with Pydantic
- Recipe 2: The schema freeze
- Recipe 3: Data quality with bad data filter
- Recipe 4: Data quality with silent value cleaner
- Recipe 5: Data quality with nested schema lock
- Recipe 6: Data quality with precision enforcer
- Recipe 7: Data quality through primary key deduplication
- Recipe 8: Data quality with schema evolution tracking
- Recipe 9: Data quality with dynamic schema contracts
- Recipe 10: Data quality with schema evolution alerts
- Recipe 11: Data quality with contract violation alerts
Let’s jump in!
(or read more about the data quality lifecyle here)
Recipe 1: Data quality with Pydantic
Pydantic lets you describe what valid data looks like using plain Python types. This helps catch bad data early instead of fixing issues later.
In a dlt pipeline, this becomes an early quality gate. Data is validated at extraction time, and if it doesn’t match the expected shape, the pipeline stops immediately.
from pydantic import BaseModel
from typing import List, Optional
import dlt
class User(BaseModel):
id: int
email: str
is_active: bool
@dlt.resource(columns=User)
def users(data: Optional[List[dict]] = None):
if data:
yield data
return
yield [{"id": 1, "email": "alice@example.com", "is_active": True}]
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_RSoLQ7MdZTYEwb8Qtj4rqh
Recipe 2: The schema freeze
Schema freeze is useful when you want your downstream schema to stay as it is, even as the upstream schema changes.
Once this contract is active, schema changes don’t slip through. New columns, datatype changes, or new tables fail the pipeline immediately.
@dlt.resource
def production_source(data: Optional[List[dict]] = None):
if data:
yield data
return
yield [{"id": 1, "email": "alice@example.com", "is_active": True}]
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_KCrGQV9bXUTWSyyygCHgMW
Recipe 3: Data quality with bad data filter
API drift is common. A new column sneaks in. A number turns into a string.
Your dashboards look fine. Pipelines keep running. But underneath, assumptions start to disagree. And by the time you notice, the damage is already done.
One way to deal with this is to drop bad data early. In dlt, you do that with discard_row, which can be applied at the table, column, or datatype level.
@dlt.resource(
schema_contract={"columns": "discard_row"}
)
def users(data: Optional[List[dict]] = None):
if data:
yield data
return
yield [
{"id": 1, "email": "alice@example.com"},
{"id": 2, "email": "jean@example.com"},
]
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_wPBz39Hb5Ryrhj6ZqjDCjc
Recipe 4: Data quality with silent value cleaner
API drift is common and usually quiet. Fields appear, disappear, and you usually find out long after it happened. Most of the time, you don’t want a single bad value to take the entire row down. That’s what discard_value is for.
For example, HubSpot’s age field might return "twenty-five" instead of 25. Left unchecked, that can crash your analytics.
With discard_value, you drop only the bad value and keep the rest of the row intact. You can apply it at the datatype, column, or table level.
@dlt.resource(schema_contract={"columns": "discard_value"})
def customers(data: Optional[List[dict]] = None):
if data:
yield data
return
yield [{"id": 1, "email": "alice@example.com"}]
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_g8E3hT4ZYjDzYBj46ifpQ7
Recipe 5: Data quality with nested schema lock
With hierarchical data, it’s often helpful to let parent tables evolve while keeping nested tables intact for hygiene.
For example, parent objects can evolve as needed, but nested fields like transaction amounts are typically contract-bound and should remain consistent.
@dlt.resource(
nested_hints={
"transactions": dlt.mark.make_nested_hints(
schema_contract={"columns": "freeze", "data_type": "freeze"},
columns=[{"name": "amount", "data_type": "decimal"}],
)
}
)
def bank_accounts(data: Optional[Dict[str, Any]] = None):
# Parameterized resource for easy testing
yield {
"id": 1,
"account_name": "Checking",
"transactions": [
{"amount": 50.00, "date": "2024-01-01"},
{"amount": 25.50, "date": "2024-01-02"},
],
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_pncVcGh5SEWQACBzqSyDG8
Recipe 6: Data quality with precision enforcer
Decimal precision is easy to get wrong, especially with financial data. Databases like BigQuery and Snowflake rely on explicit precision and scale to store values correctly.
Column hints let you enforce those constraints during the pipeline run, before the data hits the destination. Values that exceed the allowed precision are rejected instead of being silently rounded.
@dlt.resource(
columns={
"exchange_rate": {
"data_type": "decimal",
"precision": 4,
"scale": 2,
}
}
)
def rates(data: Iterable[Dict[str, Any]] | None = None):
yield from (
data
if data is not None
else [
{"currency": "EUR", "exchange_rate": 1.05},
{"currency": "GBP", "exchange_rate": 0.79},
{"currency": "JPY", "exchange_rate": 99.99},
]
)
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_qEj6Z7behY825arxcAcxMD
Recipe 7: Data quality through primary key deduplication
As data grows and pipelines rerun, uniqueness becomes a core part of keeping results trustworthy.
Using write_disposition="merge" with a primary_key ensures each record remains unique over time. Existing rows are updated, new ones are inserted, and the destination stays free of duplicates.
This makes pipelines safe to rerun and gives downstream consumers a single, consistent version of each record.
@dlt.resource(
primary_key="order_id",
write_disposition="merge",
)
def orders(data: Iterable[Dict] | None = None):
yield from (
data
if data is not None
else [
{"order_id": 123, "status": "pending", "amount": 99.99},
{"order_id": 124, "status": "shipped", "amount": 149.99},
]
)
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_fdtvcMJMQwfj9nQeD4zZJ1
Recipe 8: Data quality with schema evolution history table
Schemas rarely stay still. Columns appear, types change, and structures evolve. That’s normal. What matters is tracking those changes. In dlt, each pipeline run produces load metadata in the load info table. Storing this in a history table gives you a clear, queryable record of what changed and when. Pair this with data contracts and violations stop being mysterious. You get faster fixes, fewer disruptions, and better data quality, while the same history supports audit trails and compliance needs like HIPAA and GDPR.
def report_schema_changes(load_info):
schema_updates = [
pkg.schema_update
for pkg in load_info.load_packages
if pkg.schema_update
]
if schema_updates:
print("⚠️ Schema evolved during this load")
for update in schema_updates:
for table, change in update.items():
print(f"Table: {table}")
for col in change.get("columns", {}):
print(f" - New column: {col}")
else:
print("✅ Initial load detected")
print("Baseline schema established from incoming data")
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_LayqdS37YPJft3GBxRrBp7
Recipe 9: Data quality with dynamic schema contracts
Not all data quality rules are known upfront. Sometimes you only know how strict to be once the pipeline is running. Dynamic schema contracts let you apply or change contract behavior at runtime, without touching the resource definition. This gives you flexibility to adapt data quality rules to context, environment, or source behavior as it changes.
# Establish base schema
pipeline_evolve.run(dynamic_data())
try:
source = dynamic_data(data=new_data)
pipeline_evolve.run(r, schema_contract="evolve")
print("Evolve contract applied:")
print("Schema updated to include new column")
except Exception as e:
print("Unexpected failure:")
msg_updtd = str(e).splitlines()[0]
print(textwrap.fill(msg_updtd, width=70))
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_rZ4rXwL8YwHpa6Bt95uWUW
Recipe 10: Data quality with schema evolution alerts
Schema changes are not always bad, but silent schema changes are. This recipe shows how to detect schema evolution as it happens and surface it immediately where teams already work. By sending lightweight alerts to Slack, you turn schema drift into a visible, actionable signal instead of a downstream surprise.
#import slack webhook
hook = os.getenv("SLACK_WEBHOOK_URL")
for pkg in load_info.load_packages:
tables, cols, owners = [], [], set()
for t, tbl in (pkg.schema_update or {}).items():
new_cols = [
f"- `{n}` ({c.get('data_type', 'unknown')})"
for n, c in tbl.get("columns", {}).items()
if not n.startswith("_dlt_")
]
if not new_cols:
continue
owners.update(CONTRACT_OWNERS.get(t, []))
tables.append(f"`{t}`")
cols.extend(new_cols)
if not tables:
continue
users = " ".join(f"@{u.lstrip('@')}" for u in sorted(owners))
msg = (
f"🚨 Schema change detected {users}\n\n"
f"Tables changed:\n{', '.join(tables)}\n\n"
f"Columns added:\n" + "\n".join(cols)
)
if hook:
send_slack_message(hook, message=msg)
print("\n\n📣 Schema change alert sent to Slack")
else:
print(msg)
# The full example lives in the marimo notebook below.Runnable example: https://molab.marimo.io/notebooks/nb_YcocxLWJNHGNAAatkqLqGj
Recipe 11: Data quality with contract violation alerts
Recipe 10 showed how to observe schema changes as they happen. This recipe takes the next step and actively prevents them. When a schema is frozen, any unexpected change should fail fast and notify the right people immediately.
This pattern blocks breaking changes at the source and turns contract violations into clear, actionable alerts in Slack, before bad data spreads downstream.
# Slack webhook (prints locally if unset)
hook = os.getenv("SLACK_WEBHOOK_URL")
def handle_violation(e):
# unwrap until we reach the root validation error
while hasattr(e, "__cause__") and not isinstance(e, DataValidationError):
e = e.__cause__
if not isinstance(e, DataValidationError):
return
table = getattr(e, "table_name", "?")
column = getattr(e, "column_name", "general")
msg = (
"🚨 Contract violation detected\n"
f"Table: `{table}`\n"
f"Column: `{column}`"
)
send_slack_message(HOOK, msg) if "http" in HOOK else print(msg)
Runnable example: https://molab.marimo.io/notebooks/nb_WWVsVnu6jxPDJppmuU1JsN
Why it matters?
Data quality isn’t one check or one rule. It’s a set of small, intentional choices you make at each stage of the pipeline. These recipes are meant to be practical building blocks you can mix, match, and apply as your data and teams evolve. Start simple. Add guardrails where they matter most. Over time, your pipelines become easier to trust, easier to change, and easier to run in production.
Still here? Give it a try
Pick one recipe and try it in your pipeline today. Open the runnable notebooks, experiment, and adapt them to your use case. Each one is bite-sized and only takes a few clicks to run.
If you want to go deeper, explore the data quality lifecycle and schema contracts in the dlt docs. And if you spot a gap, jump into the dlt repo and open a PR. Data quality gets better when it’s shared.
Let’s ship cleaner data tomorrow.