Skip to main content
Version: devel

Runner

dlt+

This page is for dlt+, which requires a license. Join our early access program for a trial license.

dlt+ provides a production-ready runner for your pipelines. It offers robust error handling, retry mechanisms, and near atomic trace storage to destinations of your choice.

Usage

The runner will be used automatically if you do dlt pipeline run inside a project, or you can use dlt_plus.runner() directly in your code where you define your pipeline and data.

dlt pipeline my_pipeline run

Configuration

The runner is configured through the run_config section of a pipeline in your dlt.yml file or by passing arguments to the dlt_plus.runner() function in your code. Configuration via environment variables or config.toml is still under development.

Complete configuration example

pipelines:
my_pipeline:
source: my_source
destination: duckdb
run_config:
store_trace_info: true
run_from_clean_folder: true
retry_policy:
type: backoff
max_attempts: 5
multiplier: 2
min: 1
max: 10
retry_pipeline_steps: ["load"]

Trace storage

The store_trace_info parameter enables automatic storage of the pipeline's runtime trace, which contains detailed information about a run, e.g., timings of each step, schema changes, and load packages (see here).

The runner will convert the trace into a dict and try loading it to the destination using a separate pipeline, which runs directly after each successful or failed attempt of the main pipeline. If any pending data is finalized before running the main pipeline, the trace of that finalization is also stored.

Setting store_trace_info=True will derive the configuration of the trace pipeline from the main pipeline. That trace pipeline will be named _trace_<pipeline_name> and will write to the same destination as the main pipeline.

pipelines:
my_pipeline:
run_config:
store_trace_info: true

Alternatively, you can explicitly define a trace pipeline for example, if you want to use store your traces in a different filesystem than your production data:

destination:
log_filesystem:
type: filesystem
bucket_url: "s3://my-bucket/logs/dlt_traces"

pipelines:
my_pipeline:
source: my_source
destination: duckdb
run_config:
store_trace_info: trace_pipeline

trace_pipeline:
source: my_source # << this will not actually be used but cannot be empty
destination: log_filesystem

Trace table naming

Traces are loaded into a table named <pipeline_name>_trace. This means you can use the same trace pipeline for multiple pipelines without conflicts.

Run from clean folder

When the run_from_clean_folder option is enabled, the pipeline working directory is removed before the pipeline runs. The state, schema, and all files from previous runs are deleted, and state and schema are synchronized from the destination (see dlt pipeline sync).

pipelines:
my_pipeline:
run_config:
run_from_clean_folder: true
note

The dlt+ runner behaves slightly differently from pipeline.run() when there exists pending data in the pipeline's working directory: pipeline.run() will load only the pending data and needs to be invoked again, whereas the dlt+ runner will run with the pending data, and then automatically run again with the given data.

Retry policies

The Runner supports configurable retry policies to handle errors during pipeline execution. Retry policies apply to both finalizing pending data from previous loads and running the pipeline with the given data.

For the python interface you can define any retry policy you want using the tenacity library. For the yaml interface you can configure three different retry policies:

Policy typeDescriptionConfiguration
NoneNo retry (single attempt)type: none
FixedFixed number of attempts with no backofftype: fixed, max_attempts: N
BackoffExponential backoff with configurable parameterstype: backoff, max_attempts: N, multiplier: N, min: N, max: N
note

When storing the trace, the runner will alternate between trying to load the pipeline and loading the trace. So after the first attempt fails, it will try to load the trace of the first attempt. If the trace pipeline also fails, the trace file will be kept as traces/<transaction_id>_attempt_1_trace.json in the pipeline's working directory and the pipeline will be retried. Failures to load the trace will be logged as warnings, but do not affect the main pipeline's execution.

Retry pipeline steps

The runner will automatically apply a helper method to all given policies to define whether or not a retry should be attempted.

It takes into account the type of the exception and the pipeline step at which it occurred. For example, any exceptions inheriting from TerminalException such as those related to missing credentials will end the run immediately, whereas a PipelineStepFailed-exception such as it might occur due to a connection that timed-out could be be retried.

For the latter, the retry_pipeline_steps parameter can be used to further control during which pipeline steps a retry will be attempted.

ConfigurationBehavior
retry_pipeline_steps: ["load"]Only retry the load step (default)
retry_pipeline_steps: ["normalize", "load"]Retry both normalize and load steps
retry_pipeline_steps: ["extract", "normalize", "load"]Retry all main steps

Example configuration: This is how you can configure the runner to retry 5 times unless the error is terminal or occurs during the extract step:

pipelines:
my_pipeline:
source: github_source
run_config:
retry_policy:
type: fixed
max_attempts: 5
retry_pipeline_steps: ["normalize", "load"]

Callbacks

You can implement custom callbacks by inheriting from dlt+'s PlusLogCollector class. If such a collector is attached to a pipeline, the runner will detect it and will call its on_before, on_after and on_retry methods.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.