dlt.extract.state
source_state
def source_state() -> DictStrAny
Returns a dictionary with the source-scoped state. Source-scoped state may be shared across the resources of a particular source. Please avoid using source scoped state. Check
the resource_state
function for resource-scoped state that is visible within particular resource. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.
Notes:
The source state is a python dictionary-like object that is available within the @dlt.source
and @dlt.resource
decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
- The source state is scoped to a particular source and will be stored under the source name in the pipeline state
- It is possible to share state across many sources if they share a schema with the same name
- Any JSON-serializable values can be written and the read from the state.
dlt
dumps and restores instances of Python bytes, DateTime, Date and Decimal types. - The state available in the source decorated function is read only and any changes will be discarded.
- The state available in the resource decorated function is writable and written values will be available on the next pipeline run
delete_source_state_keys
def delete_source_state_keys(key: TAnyJsonPath,
source_state_: Optional[DictStrAny] = None
) -> None
Remove one or more key from the source state.
The key
can be any number of keys and/or json paths to be removed.
resource_state
def resource_state(resource_name: str = None,
source_state_: Optional[DictStrAny] = None) -> DictStrAny
Returns a dictionary with the resource-scoped state. Resource-scoped state is visible only to resource requesting the access. dlt state is preserved across pipeline runs and may be used to implement incremental loads.
Note that this function accepts the resource name as optional argument. There are rare cases when dlt
is not able to resolve resource name due to requesting function
working in different thread than the main. You'll need to pass the name explicitly when you request resource_state from async functions or functions decorated with @defer.
Summary:
The resource state is a python dictionary-like object that is available within the @dlt.resource
decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
- The resource state is scoped to a particular resource requesting it.
- Any JSON-serializable values can be written and the read from the state.
dlt
dumps and restores instances of Python bytes, DateTime, Date and Decimal types. - The state available in the resource decorated function is writable and written values will be available on the next pipeline run
Example:
The most typical use case for the state is to implement incremental load.
@dlt.resource(write_disposition="append")
def players_games(chess_url, players, start_month=None, end_month=None):
checked_archives = dlt.current.resource_state().setdefault("archives", [])
archives = players_archives(chess_url, players)
for url in archives:
if url in checked_archives:
print(f"skipping archive {url}")
continue
else:
print(f"getting archive {url}")
checked_archives.append(url)
# get the filtered archive
r = requests.get(url)
r.raise_for_status()
yield r.json().get("games", [])
Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players). Up to few thousand archives we should be good though.
Arguments:
resource_name
str, optional - forces to use state for a resource with this name. Defaults to None.source_state_
Optional[DictStrAny], optional - Alternative source state. Defaults to None.
Raises:
ResourceNameNotAvailable
- Raise if used outside of resource context or from a different thread than main
Returns:
DictStrAny
- State dictionary