dltHub
Blog /

11 Pythonic Data Quality Recipes for every day

  • Aman Gupta,
    Data Engineer

Data quality isn’t a single type of test or check or contract, it’s an entire bunch of things that ensure that our data is clean, our development is safe and our assumptions are true.

The Data Quality Lifecycle breaks this into three checkpoints (in-flight, staging, destination) and five pillars (structural, semantic, uniqueness, privacy, operational). But knowing the framework isn't the same as implementing it.

This post is a small recipe book. Each recipe is a copy-paste pattern for a specific data quality job:

  • Recipe 1: Data quality with Pydantic
  • Recipe 2: The schema freeze
  • Recipe 3: Data quality with bad data filter
  • Recipe 4: Data quality with silent value cleaner
  • Recipe 5: Data quality with nested schema lock
  • Recipe 6: Data quality with precision enforcer
  • Recipe 7: Data quality through primary key deduplication
  • Recipe 8: Data quality with schema evolution tracking
  • Recipe 9: Data quality with dynamic schema contracts
  • Recipe 10: Data quality with schema evolution alerts
  • Recipe 11: Data quality with contract violation alerts

Let’s jump in!

(or read more about the data quality lifecyle here.)

Recipe 1: Data quality with Pydantic

Pydantic lets you describe what valid data looks like using plain Python types. This helps catch bad data early instead of fixing issues later.

In a dlt pipeline, this becomes an early quality gate. Data is validated at extraction time, and if it doesn’t match the expected shape, the pipeline stops immediately.

from pydantic import BaseModel
from typing import List, Optional
import dlt

class User(BaseModel):
    id: int
    email: str
    is_active: bool

@dlt.resource(columns=User)
def users(data: Optional[List[dict]] = None):
    if data:
        yield data
        return

    yield [{"id": 1, "email": "alice@example.com", "is_active": True}]
    
# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_RSoLQ7MdZTYEwb8Qtj4rqh

Recipe 2: The schema freeze

Schema freeze is useful when you want your downstream schema to stay as it is, even as the upstream schema changes.

Once this contract is active, schema changes don’t slip through. New columns, datatype changes, or new tables fail the pipeline immediately.


@dlt.resource
def production_source(data: Optional[List[dict]] = None, schema_contract="freeze"):
    yield from (
        data
        if data is not None
        else [{"id": 1, "email": "alice@example.com", "is_active": True}]
    )

# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_KCrGQV9bXUTWSyyygCHgMW

Recipe 3: Data quality with bad data filter

API drift is common and usually quiet. A new column sneaks in. A number turns into a string.

Your dashboards look fine. Pipelines keep running. But underneath, assumptions start to erode. By the time you notice, the damage is already done.

One way to deal with this is to drop bad data early. In dlt, you do that with discard_row, which can be applied at the table, column, or datatype level.


@dlt.resource(
    schema_contract={"columns": "discard_row"}
)
def users(data: Optional[List[dict]] = None):
    if data:
        yield data
        return

    yield [
        {"id": 1, "email": "alice@example.com"},
        {"id": 2, "email": "jean@example.com"},
    ]
    
# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_wPBz39Hb5Ryrhj6ZqjDCjc

Recipe 4: Data quality with silent value cleaner

API drift is common. Fields appear, disappear, or quietly change, and you usually find out long after it happened. Most of the time, we don’t want a single bad value to take the entire row down. That’s what discard_value`is for.

It drops only the bad value and keeps the rest of the row intact. For example, when an API suddenly adds an extra field or returns a string where a number used to be, discard_value lets the pipeline keep moving without breaking. You can apply it at the datatype, column, or table level.



@dlt.resource(schema_contract={"columns": "discard_value"})
def customers(data: Optional[List[dict]] = None):
    if data:
        yield data
        return

    yield [{"id": 1, "email": "alice@example.com"}]
   
   
# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_g8E3hT4ZYjDzYBj46ifpQ7

Recipe 5: Data quality with nested schema lock

With hierarchical data, it’s often helpful to let parent tables evolve while keeping nested tables intact for hygiene.

With nested type hints applied, parent objects can evolve over time, while contract-critical fields like transaction amounts remain consistent.

@dlt.resource(
    nested_hints={
        "transactions": dlt.mark.make_nested_hints(
            schema_contract={"columns": "freeze", "data_type": "freeze"},
            columns=[{"name": "amount", "data_type": "decimal"}],
        )
    }
)
def bank_accounts(data: Optional[Dict[str, Any]] = None):
    # Parameterized resource for easy testing
    if data is not None:
        yield data
        return

    yield {
        "id": 1,
        "account_name": "Checking",
        "transactions": [
            {"amount": 50.00, "date": "2025-01-01"},
            {"amount": 25.50, "date": "2025-01-02"},
        ],
    }

# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_pncVcGh5SEWQACBzqSyDG8

Recipe 6: Data quality with precision enforcer

Decimal precision is easy to get wrong, especially with financial data. Databases like BigQuery and Snowflake rely on explicit precision and scale to store values correctly.

Column hints let you enforce those constraints during the pipeline run, before the data hits the destination. Values that exceed the allowed precision are rejected instead of being silently rounded.

@dlt.resource(
    columns={
        "exchange_rate": {
            "data_type": "decimal",
            "precision": 4,
            "scale": 2,
        }
    }
)
def rates(data: Iterable[Dict[str, Any]] | None = None):
    yield from (
        data
        if data is not None
        else [
            {"currency": "EUR", "exchange_rate": 1.05},
            {"currency": "GBP", "exchange_rate": 0.79},
            {"currency": "JPY", "exchange_rate": 99.99},
        ]
    )
# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_qEj6Z7behY825arxcAcxMD

Recipe 7: Data quality through primary key deduplication

As data grows and pipelines rerun, uniqueness becomes a core part of keeping results trustworthy.

Using write_disposition="merge" with a primary_key ensures each record remains unique over time. Existing rows are updated, new ones are inserted, and the destination stays free of duplicates.

This makes pipelines safe to rerun and gives downstream consumers a single, consistent version of each record.

@dlt.resource(
    primary_key="order_id",
    write_disposition="merge",
)
def orders(data: Iterable[Dict] | None = None):
    yield from (
        data
        if data is not None
        else [
            {"order_id": 123, "status": "pending", "amount": 99.99},
            {"order_id": 124, "status": "shipped", "amount": 149.99},
        ]
    )

# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_fdtvcMJMQwfj9nQeD4zZJ1

Recipe 8: Data quality with schema evolution history table

Schemas rarely stay still. Columns appear, types change, and structures evolve. That’s normal.

What matters is tracking those changes over time. In dlt, each pipeline run produces load metadata as load_info, which captures how the schema evolves. By storing this metadata as a history table, you get a clear, queryable record of what changed and when.

This gives you better context, faster debugging, and stronger lineage tracking, while also supporting audit and compliance workflows such as HIPAA and GDPR.

def report_schema_changes(load_info):
    schema_updates = [
        pkg.schema_update
        for pkg in load_info.load_packages
        if pkg.schema_update
    ]

    if schema_updates:
        print("📐 Schema changes recorded for this run")
        for update in schema_updates:
            for table, change in update.items():
                print(f"Table: {table}")
                for col in change.get("columns", {}):
                    print(f"  - Column: {col}")
    else:
        print("✅ No schema changes detected")

# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_LayqdS37YPJft3GBxRrBp7

Recipe 9: Data quality with dynamic schema contracts

Data quality rules evolve with the pipeline. Production runs are what reveal how strict your rules actually need to be.

With dlt, we can apply new rules or adjust existing ones without changing the resource before the next run. This lets data contracts evolve with the data.

# Establish base schema
pipeline_evolve.run(dynamic_data())

try:
    pipeline_evolve.run(
        dynamic_data(data=new_data),
        schema_contract="evolve",
    )
    print("Evolve contract applied:")
    print("Schema updated to include new column")

except Exception as e:
    print("Unexpected failure:")
    msg_updtd = str(e).splitlines()[0]
    print(textwrap.fill(msg_updtd, width=70))

show_loaded_data(pipeline_evolve, "dynamic_data")
    
# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_rZ4rXwL8YwHpa6Bt95uWUW

Recipe 10: Data quality with schema evolution alerts

Schema changes are not always bad, but silent schema changes are.

This recipe shows how to catch schema changes as they happen and notify teams through tools they already use, like Slack. Instead of silent drift, you get a clear signal you can act on.

from dlt.common.runtime.slack import send_slack_message

hook =  "please set me up" # set the slack webhook url

def notify_schema_changes(load_info, hook, owners_map=None):
    owners_map = owners_map or {}

    for pkg in load_info.load_packages:
        tables, cols, owners = [], [], set()

        for t, tbl in (pkg.schema_update or {}).items():
            added = [
                f"- `{n}` ({c.get('data_type', 'unknown')})"
                for n, c in tbl.get("columns", {}).items()
                if not n.startswith("_dlt_")
            ]
            if added:
                owners |= set(owners_map.get(t, []))
                tables.append(f"`{t}`")
                cols += added

        if not tables:
            continue

        users = " ".join(f"@{o.lstrip('@')}" for o in sorted(owners))
        msg = (
            f"🚨 Schema change detected {users}\n\n"
            f"Tables changed:\n{', '.join(tables)}\n\n"
            f"Columns added:\n" + "\n".join(cols)
        )

        send_slack_message(hook, message=msg) if hook else print(msg)

# The full example lives in the marimo notebook below.
Runnable example: https://molab.marimo.io/notebooks/nb_YcocxLWJNHGNAAatkqLqGj

Recipe 11: Data quality with contract violation alerts

Recipe 10 showed how to observe schema changes as they happen. This recipe takes the next step and actively prevents them. When a schema is frozen, any unexpected change should fail fast and notify the right people immediately.

This pattern blocks breaking changes at the source and turns contract violations into clear, actionable alerts in Slack, before bad data spreads downstream.

hook = "please set me up"# set the slack webhook url

CONTRACT_OWNERS = {
    "production_source": ["Adrian", "Aman"],
    "production_dev": ["Adrian", "Aman"],
}

def handle_violation(e):
    # unwrap until we reach the root validation error
    while hasattr(e, "__cause__") and not isinstance(e, DataValidationError):
        e = e.__cause__

    if not isinstance(e, DataValidationError):
        return

    table = getattr(e, "table_name", "?")
    column = getattr(e, "column_name", "general")
    owners = " ".join(f"@{o}" for o in CONTRACT_OWNERS.get(table, []))

    msg = (
        "🚨 Schema change detected\n"
        f"Owners: {owners}\n"
        f"Table: `{table}`\n"
        f"Column: `{column}`"
    )

    send_slack_message(hook, msg) if "http" in hook else print(msg)

print("Alert successfully set.")
Runnable example: https://molab.marimo.io/notebooks/nb_WWVsVnu6jxPDJppmuU1JsN

Why it matters?

Data quality isn’t one check or one rule. It’s a set of small, intentional choices you make at each stage of the pipeline. These recipes are meant to be practical building blocks you can mix, match, and apply as your data and teams evolve.

Start simple. Add guardrails where they matter most. Over time, your pipelines become easier to trust, easier to change, and easier to run in production.

Still here? Give it a try

Pick one recipe and try it in your pipeline today. Open the runnable notebooks, experiment, and adapt them to your use case. Each one is bite-sized and only takes a few clicks to run.

If you want to go deeper, explore the data quality lifecycle in the dlt docs.

Let’s ship cleaner data in the new year!