Skip to main content

Custom destination: Reverse ETL

The dlt destination decorator allows you to receive all data passing through your pipeline in a simple function. This can be extremely useful for reverse ETL, where you are pushing data back to an API.

You can also use this for sending data to a queue or a simple database destination that is not yet supported by dlt, although be aware that you will have to manually handle your own migrations in this case.

It will also allow you to simply get a path to the files of your normalized data. So, if you need direct access to parquet or jsonl files to copy them somewhere or push them to a database, you can do this here too.

Install dlt for reverse ETLโ€‹

To install dlt without additional dependencies:

pip install dlt

Set up a destination function for your pipelineโ€‹

The custom destination decorator differs from other destinations in that you do not need to provide connection credentials, but rather you provide a function which gets called for all items loaded during a pipeline run or load operation. With the @dlt.destination, you can convert any function that takes two arguments into a dlt destination.

A very simple dlt pipeline that pushes a list of items into a destination function might look like this:

import dlt
from dlt.common.typing import TDataItems
from dlt.common.schema import TTableSchema

@dlt.destination(batch_size=10)
def my_destination(items: TDataItems, table: TTableSchema) -> None:
print(table["name"])
print(items)

pipeline = dlt.pipeline("custom_destination_pipeline", destination=my_destination)
pipeline.run([1, 2, 3], table_name="items")
tip
  1. You can also remove the typing information (TDataItems and TTableSchema) from this example. Typing is generally useful to know the shape of the incoming objects, though.
  2. There are a few other ways for declaring custom destination functions for your pipeline described below.

@dlt.destination, custom destination function, and signatureโ€‹

The full signature of the destination decorator plus its function is the following:

@dlt.destination(
batch_size=10,
loader_file_format="jsonl",
name="my_custom_destination",
naming_convention="direct",
max_table_nesting=0,
skip_dlt_columns_and_tables=True
)
def my_destination(items: TDataItems, table: TTableSchema) -> None:
...

Decorator argumentsโ€‹

  • The batch_size parameter on the destination decorator defines how many items per function call are batched together and sent as an array. If you set a batch-size of 0, instead of passing in actual data items, you will receive one call per load job with the path of the file as the items argument. You can then open and process that file in any way you like.
  • The loader_file_format parameter on the destination decorator defines in which format files are stored in the load package before being sent to the destination function. This can be jsonl or parquet.
  • The name parameter on the destination decorator defines the name of the destination that gets created by the destination decorator.
  • The naming_convention parameter on the destination decorator defines the name of the destination that gets created by the destination decorator. This controls how table and column names are normalized. The default is direct, which will keep all names the same.
  • The max_nesting_level parameter on the destination decorator defines how deep the normalizer will go to normalize complex fields on your data to create subtables. This overwrites any settings on your source and is set to zero to not create any nested tables by default.
  • The skip_dlt_columns_and_tables parameter on the destination decorator defines whether internal tables and columns will be fed into the custom destination function. This is set to True by default.
note

Settings above make sure that shape of the data you receive in the destination function is as close as possible to what you see in the data source.

  • The custom destination sets the max_nesting_level to 0 by default, which means no sub-tables will be generated during the normalization phase.
  • The custom destination also skips all internal tables and columns by default. If you need these, set skip_dlt_columns_and_tables to False.

Custom destination functionโ€‹

  • The items parameter on the custom destination function contains the items being sent into the destination function.
  • The table parameter contains the schema table the current call belongs to, including all table hints and columns. For example, the table name can be accessed with table["name"].
  • You can also add config values and secrets to the function arguments, see below!

Add configuration, credentials and other secret to the destination functionโ€‹

The destination decorator supports settings and secrets variables. If you, for example, plan to connect to a service that requires an API secret or a login, you can do the following:

@dlt.destination(batch_size=10, loader_file_format="jsonl", name="my_destination")
def my_destination(items: TDataItems, table: TTableSchema, api_key: dlt.secrets.value) -> None:
...

You can then set a config variable in your .dlt/secrets.toml: like so:

[destination.my_destination]
api_key="<my-api-key>"

Custom destinations follow the same configuration rules as regular named destinations

Use the custom destination in dlt pipelineโ€‹

There are multiple ways to pass the custom destination function to dlt pipeline:

  • Directly reference the destination function

    @dlt.destination(batch_size=10)
    def local_destination_func(items: TDataItems, table: TTableSchema) -> None:
    ...

    # reference function directly
    p = dlt.pipeline("my_pipe", destination=local_destination_func)

    Like for regular destinations, you are allowed to pass configuration and credentials explicitly to destination function.

    @dlt.destination(batch_size=10, loader_file_format="jsonl", name="my_destination")
    def my_destination(items: TDataItems, table: TTableSchema, api_key: dlt.secrets.value) -> None:
    ...

    p = dlt.pipeline("my_pipe", destination=my_destination(api_key=os.getenv("MY_API_KEY")))
  • Directly via destination reference. In this case, don't use the decorator for the destination function.

    # file my_destination.py

    from dlt.common.destination import Destination

    # don't use the decorator
    def local_destination_func(items: TDataItems, table: TTableSchema) -> None:
    ...

    # via destination reference
    p = dlt.pipeline(
    "my_pipe",
    destination=Destination.from_reference(
    "destination", destination_callable=local_destination_func
    )
    )
  • Via a fully qualified string to function location (can be used from config.toml or ENV vars). The destination function should be located in another file.

    # file my_pipeline.py

    from dlt.common.destination import Destination

    # fully qualified string to function location
    p = dlt.pipeline(
    "my_pipe",
    destination=Destination.from_reference(
    "destination", destination_callable="my_destination.local_destination_func"
    )
    )

Adjust batch size and retry policy for atomic loadsโ€‹

The destination keeps a local record of how many DataItems were processed, so if you, for example, use the custom destination to push DataItems to a remote API, and this API becomes unavailable during the load resulting in a failed dlt pipeline run, you can repeat the run of your pipeline at a later moment and the custom destination will restart from the whole batch that failed. We are preventing any data from being lost, but you can still get duplicated data if you committed half of the batch ie. to a database and then failed. Keeping the batch atomicity is on you. For this reason it makes sense to choose a batch size that you can process in one transaction (say one api request or one database transaction) so that if this request or transaction fail repeatedly you can repeat it at the next run without pushing duplicate data to your remote location. For systems that are not transactional and do not tolerate duplicated data, you can use batch of size 1.

Destination functions that raise exceptions are retried 5 times before giving up (load.raise_on_max_retries config option). If you run the pipeline again, it will resume loading before extracting new data.

If your exception derives from DestinationTerminalException, the whole load job will be marked as failed and not retried again.

caution

If you wipe out the pipeline folder (where job files and destination state are saved) you will not be able to restart from the last failed batch. However, it is fairly easy to backup and restore the pipeline directory, see details below.

Increase or decrease loading parallelismโ€‹

Calls to the destination function by default will be executed on multiple threads, so you need to make sure you are not using any non-thread-safe nonlocal or global variables from outside your destination function. If you need to have all calls be executed from the same thread, you can set the workers config variable of the load step to 1.

tip

For performance reasons, we recommend keeping the multithreaded approach and making sure that you, for example, are using threadsafe connection pools to a remote database or queue.

Write dispositionโ€‹

@dlt.destination will forward all normalized DataItems encountered during a pipeline run to the custom destination function, so there is no notion of "write dispositions".

Staging supportโ€‹

@dlt.destination does not support staging files in remote locations before being called at this time. If you need this feature, please let us know.

Manage pipeline state for incremental loadingโ€‹

Custom destinations do not have a general mechanism to restore pipeline state. This will impact data sources that rely on the state being kept ie. all incremental resources. If you wipe the pipeline directory (ie. by deleting a folder or running on AWS lambda / Github Actions where you get a clean runner) the progress of the incremental loading is lost. On the next run you will re-acquire the data from the beginning.

While we are working on a pluggable state storage you can fix the problem above by:

  1. Not wiping the pipeline directory. For example if you run your pipeline on an EC instance periodically, the state will be preserved.
  2. By doing a restore/backup of the pipeline directory before/after it runs. This is way easier than it sounds and here's a script you can reuse.

What's nextโ€‹

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub โ€“ it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.