extract.incremental
Incremental Objects
@configspec
class Incremental(ItemTransform[TDataItem], BaseConfiguration,
Generic[TCursorValue])
Adds incremental extraction for a resource by storing a cursor value in persistent state.
The cursor could for example be a timestamp for when the record was created and you can use this to load only new records created since the last run of the pipeline.
To use this the resource function should have an argument either type annotated with Incremental
or a default Incremental
instance.
For example:
When the resource has a primary_key
specified this is used to deduplicate overlapping items with the same cursor value.
Alternatively you can use this class as transform step and add it to any resource. For example:
@dlt.resource(primary_key='id')
def some_data(created_at=dlt.sources.incremental('created_at', '2023-01-01T00:00:00Z'):
yield from request_data(created_after=created_at.last_value)
@dlt.resource
def some_data():
last_value = dlt.sources.incremental.from_existing_state("some_data", "item.ts")
...
r = some_data().add_step(dlt.sources.incremental("item.ts", initial_value=now, primary_key="delta"))
info = p.run(r, destination="duckdb")
Arguments:
cursor_path
- The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.initial_value
- Optional value used forlast_value
when no state is available, e.g. on the first run of the pipeline. If not providedlast_value
will beNone
on the first run.last_value_func
- Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default ismax
primary_key
- Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checksend_value
- Optional value used to load a limited range of records betweeninitial_value
andend_value
. Use in conjunction withinitial_value
, e.g. load records from given monthincremental(initial_value="2022-01-01T00:00:00Z", end_value="2022-02-01T00:00:00Z")
Note, when this is set the incremental filtering is stateless andinitial_value
always supersedes any previous incremental value in state.row_order
- Declares that data source returns rows in descending (desc) or ascending (asc) order as defined bylast_value_func
. If row order is know, Incremental class is able to stop requesting new rows by closing pipe generator. This prevents getting more data from the source. Defaults to None, which means that row order is not known.allow_external_schedulers
- If set to True, allows dlt to look for external schedulers from which it will take "initial_value" and "end_value" resulting in loading only specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loadedon_cursor_value_missing
- Specify what happens when the cursor_path does not exist in a record or a record hasNone
at the cursor_path: raise, include, excludelag
- Optional value used to define a lag or attribution window. For datetime cursors, this is interpreted as seconds. For other types, it uses the + or - operator depending on the last_value_func.range_start
- Decide whether the incremental filtering range isopen
orclosed
on the start value side. Default isclosed
. Setting this toopen
means that items with the same cursor value as the last value from the previous run (orinitial_value
) are excluded from the result. Theopen
range disables deduplication logic so it can serve as an optimization when you know cursors don't overlap between pipeline runs.range_end
- Decide whether the incremental filtering range isopen
orclosed
on the end value side. Default isopen
(exactend_value
is excluded). Setting this toclosed
means that items with the exact same cursor value as theend_value
are included in the result.
placement_affinity
stick to end
from_existing_state
@classmethod
def from_existing_state(cls, resource_name: str,
cursor_path: str) -> "Incremental[TCursorValue]"
Create Incremental instance from existing state.
merge
def merge(other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue]"
Create a new incremental instance which merges the two instances.
Only properties which are not None
from other
override the current instance properties.
This supports use cases with partial overrides, such as:
def my_resource(updated=incremental('updated', initial_value='1970-01-01'))
...
my_resource(updated=incremental(initial_value='2023-01-01', end_value='2023-02-01'))
get_cursor_column_name
def get_cursor_column_name() -> Optional[str]
Return the name of the cursor column if the cursor path resolves to a single column
get_state
def get_state() -> IncrementalColumnState
Returns an Incremental state for a particular cursor column
get_incremental_value_type
def get_incremental_value_type() -> Type[Any]
Infers the type of incremental value from a class of an instance if those preserve the Generic arguments information.
bind
def bind(pipe: SupportsPipe) -> "Incremental[TCursorValue]"
Called by pipe just before evaluation
can_close
def can_close() -> bool
Checks if incremental is out of range and can be closed.
Returns true only when row_order
was set and
- results are ordered ascending and are above upper bound (end_value)
- results are ordered descending and are below or equal lower bound (start_value)
IncrementalResourceWrapper Objects
class IncrementalResourceWrapper(ItemTransform[TDataItem])
placement_affinity
stick to end
__init__
def __init__(
primary_key: Optional[TTableHintTemplate[TColumnNames]] = None
) -> None
Creates a wrapper over a resource function that accepts Incremental instance in its argument to perform incremental loading.
The wrapper delays instantiation of the Incremental to the moment of actual execution and is currently used by dlt.resource
decorator.
The wrapper explicitly (via resource_name
) parameter binds the Incremental state to a resource state.
Note that wrapper implements FilterItem
transform interface and functions as a processing step in the before-mentioned resource pipe.
Arguments:
primary_key
TTableHintTemplate[TColumnKey], optional - A primary key to be passed to Incremental Instance at execution. Defaults to None.
inject_implicit_incremental_arg
@staticmethod
def inject_implicit_incremental_arg(
incremental: Optional[Union[Incremental[Any],
"IncrementalResourceWrapper"]],
sig: inspect.Signature,
func_args: Tuple[Any],
func_kwargs: Dict[str, Any],
fallback: Optional[Incremental[Any]] = None
) -> Tuple[Tuple[Any], Dict[str, Any], Optional[Incremental[Any]]]
Inject the incremental instance into function arguments if the function has an incremental argument without default in its signature and it is not already set in the arguments.
Returns:
Tuple of the new args, kwargs and the incremental instance that was injected (if any)
wrap
def wrap(sig: inspect.Signature, func: TFun) -> TFun
Wrap the callable to inject an Incremental
object configured for the resource.
set_incremental
def set_incremental(incremental: Optional[TIncrementalConfig],
from_hints: bool = False) -> None
Sets the incremental. If incremental was set from_hints, it can only be changed in the same manner
allow_external_schedulers
@property
def allow_external_schedulers() -> bool
Allows the Incremental instance to get its initial and end values from external schedulers like Airflow