Skip to main content
Version: devel View Markdown

Data quality ๐Ÿงช

warning

๐Ÿšง This feature is under development. Interested in becoming an early tester? Join dltHub early access.

dltHub data quality features include metrics for monitoring dataset properties over time, and checks to validate them against expectations. Together, they offer visibility and allow to catch data issues early. Metrics and checks are defined via Python code. The extensive configuration allows you to specify what to monitor and validate, when, how, and where to store results.

This page covers the basics of metrics and checks. You should notice a lot of symmetry (e.g., with_metrics() and with_checks()). The later parts of this page cover notions applicable to both.

Metricsโ€‹

A data quality metric or metric a function applied to data that returns a scalar value describing a property of the data. A metric can take as input a column, a table, or the full dataset (i.e., all tables and historical metrics).

Define metricsโ€‹

Staticโ€‹

You can define metrics along your @dlt.resource (and @dlt.transformer, @dlt.hub.transformation) via the new decorator @with_metrics. It is available under the dlt.hub.data_quality module, commonly imported as dq. Inside the decorator, you can set the individual metrics available through dq.metrics.column., dq.metrics.table., or dq.metrics.dataset..

The next snippet defines 3 metrics on the customers resource: the mean of the amount column, the number of null values in the email column, and the total number of rows in the table.

note

Only column-level and table-level metrics can be defined on a @dlt.resource. To set dataset-level metrics, use @with_metrics on the @dlt.source.

import dlt
from dlt.hub import data_quality as dq

@dq.with_metrics(
dq.metrics.column.mean("amount"),
dq.metrics.column.null_count("email"),
dq.metrics.table.row_count()
)
@dlt.resource
def customers():
yield data

The next snippet shows how to add dataset-level metrics to a source. The total_row_count is added on the crm source.

import dlt
from dlt.hub import data_quality as dq

@dq.with_metrics(
dq.metrics.column.mean("amount"),
dq.metrics.column.null_count("email"),
dq.metrics.table.row_count()
)
@dlt.resource
def customers():
yield data


@dq.with_metrics(
dq.metrics.dataset.total_row_count()
)
@dlt.source
def crm():
return [customers]

Dynamicโ€‹

Similar to the static approach, you can add metrics to an instantiated resource or source object using with_metrics. This is particularly useful when using built-in sources and resources like filesystem, rest_api or sql_database.

import dlt
from dlt.hub import data_quality as dq

@dlt.resource
def customers():
yield from [...]

# later; this mutates the resource object and sets metrics
dq.with_metrics(
customers,
dq.metrics.column.mean("amount"),
dq.metrics.column.null_count("email"),
dq.metrics.table.row_count()
)

Available metricsโ€‹

Here's the list of built-in metrics:

from dlt.hub import data_quality as dq

# column-level
dq.metrics.column.maximum("col")
dq.metrics.column.minimum("col")
dq.metrics.column.mean("col")
dq.metrics.column.median("col")
dq.metrics.column.mode("col")
dq.metrics.column.sum("col")
dq.metrics.column.standard_deviation("col")
dq.metrics.column.quantile("col", quantile=0.95)
dq.metrics.column.null_count("col")
dq.metrics.column.null_rate("col")
dq.metrics.column.unique_count("col")
dq.metrics.column.average_length("col")
dq.metrics.column.minimum_length("col")
dq.metrics.column.maximum_length("col")

# table-level
dq.metrics.table.row_count() # Number of rows in table
dq.metrics.table.unique_count() # Number of distinct / unique rows in table
dq.metrics.table.null_row_count() # Number of rows where all columns are null

# dataset-level
dq.metrics.dataset.total_row_count() # Total number of rows
dq.metrics.dataset.load_row_count() # Rows added in latest load
dq.metrics.dataset.latest_loaded_at() # Timestamp of most recent load
note

If you have built-in metrics requests, let us know. Custom metrics are planned.

Compute metricsโ€‹

Pass the pipeline to dq.enable_data_quality() to enable metrics. This will set a flag on the pipeline state to compute metrics and write results to destination after each pipeline.run() call.

pipeline = dlt.pipeline("my_pipeline", destination="duckdb")

dq.enable_data_quality(pipeline)

pipeline.run(customers)

Since the flag is stored on the pipeline state, instantiating this pipeline from another script or notebook will remember that metrics are enabled and compute metrics when pipeline.run() is called.

# another_file.py
pipeline = dlt.attach("my_pipeline")

# this will compute metrics too
pipeline.run(customers)

Read metricsโ€‹

The convenience function dq.read_metric() allows you to retrieve stored metrics with some metadata. This makes it easy to build reporting, dashboard, or analytics over this data.

The function produces a dlt.Relation which can be converted to a list, pandas dataframe, arrow table, etc.

dataset = pipeline.dataset()
# column-level `mean` as pandas.DataFrame
dq.read_metric(
dataset,
table="customers",
column="amount",
metric="mean"
).df()

# table-level `row_count` as list of tuples
dq.read_metric(
dataset,
table="customers",
metric="row_count"
).fetchall()

# dataset-level `total_row_count` as pyarrow.Table
dq.read_metric(
dataset,
metric="total_row_count"
).arrow()

Checksโ€‹

A data quality check or check is a function applied to data that returns a check result or result (can be boolean, integer, float, etc.). The result that is converted to a success / fail check outcome or outcome (boolean) based on a decision.

info

A test verifies that code behaves as expected. A check verifies that the data meets some expectations. Code tests enable you to make changes with confidence and data checks help monitor your live systems.

Define checksโ€‹

Staticโ€‹

You can define checks along your @dlt.resource (and @dlt.transformer, @dlt.hub.transformation) via the new decorator @with_checks available under the dlt.hub.data_quality module. Inside the decorator, you can set the individual checks available through dq.checks..

This snippet shows a single is_in() check being ran against the orders table.

import dlt
from dlt.hub import data_quality as dq

@dq.with_checks( # type: ignore[attr-defined]
dq.checks.is_in("payment_methods", ["card", "cash", "voucher"]),
)
@dlt.resource
def orders():
yield from [...]

Dynamicโ€‹

Similar to the static approach, you can add checks to an instantiated resource or source object using with_checks. This is particularly useful when using built-in sources and resources like filesystem, rest_api or sql_database.

import dlt
from dlt.hub import data_quality as dq

@dlt.resource
def orders():
yield data

# later; this mutates the resource object and sets checks
dq.with_checks(
orders,
dq.checks.is_in("payment_methods", ["card", "cash", "voucher"]),
)

Available checksโ€‹

Here's the list of built-in checks:

from dlt.hub import data_quality as dq

dq.checks.is_unique("col")
dq.checks.is_not_null("col")
dq.checks.is_primary_key("col") # valid primary key
dq.checks.is_in("foo", ["bar", "baz"]) # valid values
dq.checks.case("col < 0") # row-wise check

Compute checksโ€‹

Pass the pipeline to dq.enable_data_quality() to enable checks. This will set a flag on the pipeline state to compute checks and write results to destination after each pipeline.run() call.

pipeline = dlt.pipeline("my_pipeline", destination="duckdb")

dq.enable_data_quality(pipeline)

pipeline.run(customers)

Since the flag is stored on the pipeline state, instantiating this pipeline from another script or notebook will remember that metrics are enabled and compute checks when pipeline.run() is called.

# another_file.py
pipeline = dlt.attach("my_pipeline")

# this will compute checks too
pipeline.run(customers)

Read checksโ€‹

The convenience function dq.read_check() allows you to retrieve stored checks with some metadata. This makes it easy to build reporting, dashboard, or analytics over this data.

The function produces a dlt.Relation which can be converted to a list, pandas dataframe, arrow table, etc.

dataset = pipeline.dataset()
dq.read_check(
dataset,
table="orders",
column="payment_method",
).df()

Lifecycleโ€‹

Data quality (both metrics and checks) can be executed at different stages of the pipeline lifecycle. This impacts several aspects including:

  • available input data
  • compute resources used
  • actions available after a failed check (e.g., prevent invalid data load)

Post-loadโ€‹

The post-load execution is the simplest option. The pipeline goes through Extract -> Normalize -> Load as usual. Then, the checks are executed on the destination.

Properties:

  • Failed records can't be dropped or quarantined before load. All records must be written, checked, and then handled. This only works with write_disposition="append" or destinations supporting snapshots (e.g. iceberg, ducklake).
  • Checks have access to the full dataset. This includes current and past loads + internal dlt tables.
  • Computed directly on the destination. This scales well with the size of the data and the complexity of the checks.
  • Results and outcome are directly stored on the dataset. No data movement is required.

Pre-load (staging)โ€‹

warning

Work in progress. Currently unavailable.

The pre-load execution via staging dataset allows to execute checks on the destination and trigger actions before data is loaded into the dataset. This is effectively using post-load checks before a 2nd load phase.

info

dlt uses staging datasets for other features such as merge and replace write dispositions.

Properties:

  • Failed records can be dropped or quarantined before load. This works with all write_disposition
  • Requires a destination that supports staging datasets.
  • Checks have access to the current load.
    • If the staging dataset is on the same destination, checks can access the full dataset.
    • If the staging dataset is on a different destination, communication between the staging dataset and the dataset.
  • Computed on the staging destination. This scales well with the size of the data and the complexity of the checks.
  • Data and checks results & outcome can be safely stored on the staging dataset until review. This helps human-in-the-loop workflows without reprocessing the full pipeline.

Pre-load (in-memory)โ€‹

warning

Work in progress. Currently unavailable.

The pre-load execution in-memory will execute checks using duckdb against the load packages (i.e., temporary files) stored on the machine that runs dlt. This allows to trigger actions before data is loaded into the destination.

note

This is equivalent to using a staging destination that is the local filesystem. This section highlights the trade-offs of this choice.

Properties:

  • Failed records can be dropped or quarantined before load. This works with all `write_disposition
  • Checks only have access to the current load. Checking against the full dataset requires communication between the staging destination and the main destination.
  • Computed on the machine running the pipeline. The resource need to match the compute requirements.
  • Data and checks results & outcome may be lost if the runtime is ephemeral (e.g., AWS Lambda timeout). In this case, the pipeline must process the data again.

Roadmapโ€‹

  • Define checks that depend on metrics (this should reduce verbosity)
  • Support user-defined group-by metrics
  • Support completely custom checks via @dlt.hub.transformation (e.g., SQL, SQLGlot, Ibis, Narwhals, Polars)
  • Trigger actions based on check result or outcome (e.g., send Slack notification)
  • Track metrics and checks changes via dlt.Schema versioning

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub โ€“ it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.