Skip to main content
Version: devel

State

The pipeline state is a Python dictionary that lives alongside your data; you can store values in it and, on the next pipeline run, request them back.

Read and write pipeline state in a resource

You read and write the state in your resources. Below, we use the state to create a list of chess game archives, which we then use to prevent requesting duplicates.

@dlt.resource(write_disposition="append")
def players_games(chess_url, player, start_month=None, end_month=None):
# create or request a list of archives from resource-scoped state
checked_archives = dlt.current.resource_state().setdefault("archives", [])
# get a list of archives for a particular player
archives = _get_players_archives(chess_url, player)
for url in archives:
if url in checked_archives:
print(f"skipping archive {url}")
continue
else:
print(f"getting archive {url}")
checked_archives.append(url)
# get the filtered archive
r = requests.get(url)
r.raise_for_status()
yield r.json().get("games", [])

Above, we request the resource-scoped state. The checked_archives list stored under the archives dictionary key is private and visible only to the players_games resource.

The pipeline state is stored locally in the pipeline working directory and, as a consequence, it cannot be shared with pipelines with different names. You must also make sure that data written into the state is JSON serializable. Except for standard Python types, dlt handles DateTime, Decimal, bytes, and UUID.

Share state across resources and read state in a source

You can also access the source-scoped state with dlt.current.source_state(), which can be shared across resources of a particular source and is also available read-only in the source-decorated functions. The most common use case for the source-scoped state is to store a mapping of custom fields to their displayable names. You can take a look at our pipedrive source for an example of state passed across resources.

tip

Decompose your source to, for example, run it on Airflow in parallel. If you cannot avoid that, designate one of the resources as the state writer and all others as state readers. This is exactly what the pipedrive pipeline does. With such a structure, you will still be able to run some of your resources in parallel.

caution

The dlt.state() is a deprecated alias to dlt.current.source_state() and will soon be removed.

Syncing state with destination

What if you run your pipeline on, for example, Airflow, where every task gets a clean filesystem and the pipeline working directory is always deleted? dlt loads your state into the destination along with all other data, and when faced with a clean start, it will try to restore the state from the destination.

The remote state is identified by the pipeline name, the destination location (as given by the credentials), and the destination dataset. To reuse the same state, use the same pipeline name and destination.

The state is stored in the _dlt_pipeline_state table at the destination and contains information about the pipeline, the pipeline run (to which the state belongs), and the state blob.

dlt has a dlt pipeline sync command where you can request the state back from that table.

💡 If you can keep the pipeline working directory across the runs, you can disable the state sync by setting restore_from_destination=false in your config.toml.

When to use pipeline state

Do not use pipeline state if it can grow to millions of records

Do not use dlt state when it may grow to millions of elements. Do you plan to store modification timestamps of all your millions of user records? This is probably a bad idea! In that case, you could:

  • Store the state in DynamoDB, Redis, etc., taking into account that if the extract stage fails, you'll end up with an invalid state.
  • Use your loaded data as the state. dlt exposes the current pipeline via dlt.current.pipeline() from which you can obtain sqlclient and load the data of interest. In that case, try at least to process your user records in batches.

Access data in the destination instead of pipeline state

In the example below, we load recent comments made by a given user_id. We access the user_comments table to select the maximum comment id for a given user.

import dlt

@dlt.resource(name="user_comments")
def comments(user_id: str):
current_pipeline = dlt.current.pipeline()
# find the last comment id for the given user_id by looking in the destination
max_id: int = 0
# on the first pipeline run, the user_comments table does not yet exist so do not check at all
# alternatively, catch DatabaseUndefinedRelation which is raised when an unknown table is selected
if not current_pipeline.first_run:
# get user comments table from pipeline dataset
user_comments = current_pipeline.dataset().user_comments
# get last user comment id with ibis expression, ibis-extras need to be installed
max_id_df = user_comments.filter(user_comments.user_id == user_id).select(user_comments["_id"].max()).df()
# if there are no comments for the user, max_id will be None, so we replace it with 0
max_id = max_id_df[0][0] if len(max_id_df.index) else 0

# use max_id to filter our results (we simulate an API query)
yield from [
{"_id": i, "value": letter, "user_id": user_id}
for i, letter in zip([1, 2, 3], ["A", "B", "C"])
if i > max_id
]

When the pipeline is first run, the destination dataset and user_comments table do not yet exist. We skip the destination query by using the first_run property of the pipeline. We also handle a situation where there are no comments for a user_id by replacing None with 0 as max_id.

Inspect the pipeline state

You can inspect the pipeline state with the dlt pipeline command:

dlt pipeline -v chess_pipeline info

This will display the source and resource state slots for all known sources.

Reset the pipeline state: full or partial

To fully reset the state:

To partially reset the state:

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.