Skip to main content
Version: devel

extract.incremental

Incremental Objects

@configspec
class Incremental(ItemTransform[TDataItem], BaseConfiguration,
Generic[TCursorValue])

[view_source]

Adds incremental extraction for a resource by storing a cursor value in persistent state.

The cursor could for example be a timestamp for when the record was created and you can use this to load only new records created since the last run of the pipeline.

To use this the resource function should have an argument either type annotated with Incremental or a default Incremental instance. For example:

When the resource has a primary_key specified this is used to deduplicate overlapping items with the same cursor value.

Alternatively you can use this class as transform step and add it to any resource. For example:


@dlt.resource(primary_key='id')
def some_data(created_at=dlt.sources.incremental('created_at', '2023-01-01T00:00:00Z'):
yield from request_data(created_after=created_at.last_value)
@dlt.resource
def some_data():
last_value = dlt.sources.incremental.from_existing_state("some_data", "item.ts")
...

r = some_data().add_step(dlt.sources.incremental("item.ts", initial_value=now, primary_key="delta"))
info = p.run(r, destination="duckdb")

Arguments:

  • cursor_path - The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.
  • initial_value - Optional value used for last_value when no state is available, e.g. on the first run of the pipeline. If not provided last_value will be None on the first run.
  • last_value_func - Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default is max
  • primary_key - Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checks
  • end_value - Optional value used to load a limited range of records between initial_value and end_value. Use in conjunction with initial_value, e.g. load records from given month incremental(initial_value="2022-01-01T00:00:00Z", end_value="2022-02-01T00:00:00Z") Note, when this is set the incremental filtering is stateless and initial_value always supersedes any previous incremental value in state.
  • row_order - Declares that data source returns rows in descending (desc) or ascending (asc) order as defined by last_value_func. If row order is know, Incremental class is able to stop requesting new rows by closing pipe generator. This prevents getting more data from the source. Defaults to None, which means that row order is not known.
  • allow_external_schedulers - If set to True, allows dlt to look for external schedulers from which it will take "initial_value" and "end_value" resulting in loading only specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loaded
  • on_cursor_value_missing - Specify what happens when the cursor_path does not exist in a record or a record has None at the cursor_path: raise, include, exclude
  • lag - Optional value used to define a lag or attribution window. For datetime cursors, this is interpreted as seconds. For other types, it uses the + or - operator depending on the last_value_func.
  • range_start - Decide whether the incremental filtering range is open or closed on the start value side. Default is closed. Setting this to open means that items with the same cursor value as the last value from the previous run (or initial_value) are excluded from the result. The open range disables deduplication logic so it can serve as an optimization when you know cursors don't overlap between pipeline runs.
  • range_end - Decide whether the incremental filtering range is open or closed on the end value side. Default is open (exact end_value is excluded). Setting this to closed means that items with the exact same cursor value as the end_value are included in the result.

placement_affinity

stick to end

from_existing_state

@classmethod
def from_existing_state(cls, resource_name: str,
cursor_path: str) -> "Incremental[TCursorValue]"

[view_source]

Create Incremental instance from existing state.

merge

def merge(other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue]"

[view_source]

Create a new incremental instance which merges the two instances. Only properties which are not None from other override the current instance properties.

This supports use cases with partial overrides, such as:

def my_resource(updated=incremental('updated', initial_value='1970-01-01'))
...

my_resource(updated=incremental(initial_value='2023-01-01', end_value='2023-02-01'))

get_cursor_column_name

def get_cursor_column_name() -> Optional[str]

[view_source]

Return the name of the cursor column if the cursor path resolves to a single column

get_state

def get_state() -> IncrementalColumnState

[view_source]

Returns an Incremental state for a particular cursor column

get_incremental_value_type

def get_incremental_value_type() -> Type[Any]

[view_source]

Infers the type of incremental value from a class of an instance if those preserve the Generic arguments information.

bind

def bind(pipe: SupportsPipe) -> "Incremental[TCursorValue]"

[view_source]

Called by pipe just before evaluation

can_close

def can_close() -> bool

[view_source]

Checks if incremental is out of range and can be closed.

Returns true only when row_order was set and

  1. results are ordered ascending and are above upper bound (end_value)
  2. results are ordered descending and are below or equal lower bound (start_value)

IncrementalResourceWrapper Objects

class IncrementalResourceWrapper(ItemTransform[TDataItem])

[view_source]

placement_affinity

stick to end

__init__

def __init__(
primary_key: Optional[TTableHintTemplate[TColumnNames]] = None
) -> None

[view_source]

Creates a wrapper over a resource function that accepts Incremental instance in its argument to perform incremental loading.

The wrapper delays instantiation of the Incremental to the moment of actual execution and is currently used by dlt.resource decorator. The wrapper explicitly (via resource_name) parameter binds the Incremental state to a resource state. Note that wrapper implements FilterItem transform interface and functions as a processing step in the before-mentioned resource pipe.

Arguments:

  • primary_key TTableHintTemplate[TColumnKey], optional - A primary key to be passed to Incremental Instance at execution. Defaults to None.

inject_implicit_incremental_arg

@staticmethod
def inject_implicit_incremental_arg(
incremental: Optional[Union[Incremental[Any],
"IncrementalResourceWrapper"]],
sig: inspect.Signature,
func_args: Tuple[Any],
func_kwargs: Dict[str, Any],
fallback: Optional[Incremental[Any]] = None
) -> Tuple[Tuple[Any], Dict[str, Any], Optional[Incremental[Any]]]

[view_source]

Inject the incremental instance into function arguments if the function has an incremental argument without default in its signature and it is not already set in the arguments.

Returns:

Tuple of the new args, kwargs and the incremental instance that was injected (if any)

wrap

def wrap(sig: inspect.Signature, func: TFun) -> TFun

[view_source]

Wrap the callable to inject an Incremental object configured for the resource.

set_incremental

def set_incremental(incremental: Optional[TIncrementalConfig],
from_hints: bool = False) -> None

[view_source]

Sets the incremental. If incremental was set from_hints, it can only be changed in the same manner

allow_external_schedulers

@property
def allow_external_schedulers() -> bool

[view_source]

Allows the Incremental instance to get its initial and end values from external schedulers like Airflow

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.