Skip to main content
Version: devel

Lag / Attribution window

In many cases, certain data should be reacquired during incremental loading. For example, you may want to always capture the last 7 days of data when fetching daily analytics reports, or refresh Slack message replies with a moving window of 7 days. This is where the concept of "lag" or "attribution window" comes into play.

The lag parameter is a float that supports several types of incremental cursors: datetime, date, integer, and float. It can only be used with last_value_func set to min or max (default is max).

How lag works

  • Datetime cursors: lag is the number of seconds added or subtracted from the last_value loaded.
  • Date cursors: lag represents days.
  • Numeric cursors (integer or float): lag respects the given unit of the cursor.

This flexibility allows lag to adapt to different data contexts.

Example using datetime incremental cursor with merge as write_disposition

This example demonstrates how to use a datetime cursor with a lag parameter, applying merge as the write_disposition. The setup runs twice, and during the second run, the lag parameter re-fetches recent entries to capture updates.

  1. First Run: Loads initial_entries.
  2. Second Run: Loads second_run_events with the specified lag, refreshing previously loaded entries.

This setup demonstrates how lag ensures that a defined period of data remains refreshed, capturing updates or changes within the attribution window.

pipeline = dlt.pipeline(
destination=dlt.destinations.duckdb(credentials=duckdb.connect(":memory:")),
)

# Flag to indicate the second run
is_second_run = False

@dlt.resource(name="events", primary_key="id", write_disposition="merge")
def events_resource(
_=dlt.sources.incremental("created_at", lag=3600, last_value_func=max)
):
global is_second_run

# Data for the initial run
initial_entries = [
{"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "1"},
{"id": 2, "created_at": "2023-03-03T02:00:00Z", "event": "2"}, # lag applied during second run
]

# Data for the second run
second_run_events = [
{"id": 1, "created_at": "2023-03-03T01:00:00Z", "event": "1_updated"},
{"id": 2, "created_at": "2023-03-03T02:00:01Z", "event": "2_updated"},
{"id": 3, "created_at": "2023-03-03T03:00:00Z", "event": "3"},
]

# Yield data based on the current run
yield from second_run_events if is_second_run else initial_entries

# Run the pipeline twice
pipeline.run(events_resource)
is_second_run = True # Update flag for second run
pipeline.run(events_resource)

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.