Skip to main content
Version: devel

dlt.extract.state

source_state

def source_state() -> DictStrAny

View source on GitHub

Returns a dictionary with the source-scoped state. Source-scoped state may be shared across the resources of a particular source. Please avoid using source scoped state. Check the resource_state function for resource-scoped state that is visible within particular resource. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.

Notes:

The source state is a python dictionary-like object that is available within the @dlt.source and @dlt.resource decorated functions and may be read and written to. The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. When using the state:

  • The source state is scoped to a particular source and will be stored under the source name in the pipeline state
  • It is possible to share state across many sources if they share a schema with the same name
  • Any JSON-serializable values can be written and the read from the state. dlt dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
  • The state available in the source decorated function is read only and any changes will be discarded.
  • The state available in the resource decorated function is writable and written values will be available on the next pipeline run

delete_source_state_keys

def delete_source_state_keys(key: TAnyJsonPath,
source_state_: Optional[DictStrAny] = None
) -> None

View source on GitHub

Remove one or more key from the source state. The key can be any number of keys and/or json paths to be removed.

resource_state

def resource_state(resource_name: str = None,
source_state_: Optional[DictStrAny] = None) -> DictStrAny

View source on GitHub

Returns a dictionary with the resource-scoped state. Resource-scoped state is visible only to resource requesting the access. dlt state is preserved across pipeline runs and may be used to implement incremental loads.

Note that this function accepts the resource name as optional argument. There are rare cases when dlt is not able to resolve resource name due to requesting function working in different thread than the main. You'll need to pass the name explicitly when you request resource_state from async functions or functions decorated with @defer.

Summary: The resource state is a python dictionary-like object that is available within the @dlt.resource decorated functions and may be read and written to. The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. When using the state:

  • The resource state is scoped to a particular resource requesting it.
  • Any JSON-serializable values can be written and the read from the state. dlt dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
  • The state available in the resource decorated function is writable and written values will be available on the next pipeline run

Example:

The most typical use case for the state is to implement incremental load.

    @dlt.resource(write_disposition="append")
def players_games(chess_url, players, start_month=None, end_month=None):
checked_archives = dlt.current.resource_state().setdefault("archives", [])
archives = players_archives(chess_url, players)
for url in archives:
if url in checked_archives:
print(f"skipping archive {url}")
continue
else:
print(f"getting archive {url}")
checked_archives.append(url)
# get the filtered archive
r = requests.get(url)
r.raise_for_status()
yield r.json().get("games", [])

Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players). Up to few thousand archives we should be good though.

Arguments:

  • resource_name str, optional - forces to use state for a resource with this name. Defaults to None.
  • source_state_ Optional[DictStrAny], optional - Alternative source state. Defaults to None.

Raises:

  • ResourceNameNotAvailable - Raise if used outside of resource context or from a different thread than main

Returns:

  • DictStrAny - State dictionary

reset_resource_state

def reset_resource_state(resource_name: str,
source_state_: Optional[DictStrAny] = None) -> None

View source on GitHub

Resets the resource state with name resource_name by removing it from source_state

Arguments:

  • resource_name str - The resource key to reset
  • source_state_ Optional[DictStrAny] - Optional source state dictionary to operate on. Use when working outside source context.

get_matching_sources

def get_matching_sources(
pattern: REPattern,
pipeline_state: Optional[TPipelineState] = None) -> List[str]

View source on GitHub

Get all source names in state matching the regex pattern

pipe_context

@contextlib.contextmanager
def pipe_context(pipe: SupportsPipe) -> Iterator[None]

View source on GitHub

Sync context-manager that sets the variable temporarily.

async_pipe_context

@contextlib.asynccontextmanager
async def async_pipe_context(pipe: SupportsPipe) -> AsyncIterator[None]

View source on GitHub

Async variant for async with blocks.

get_current_pipe

def get_current_pipe() -> SupportsPipe

View source on GitHub

When executed from within dlt.resource decorated function, gets execution pipe

get_current_pipe_name

def get_current_pipe_name() -> str

View source on GitHub

When executed from within dlt.resource decorated function, gets pipe name associated with current resource

Pipe name is the same as resource name for all currently known cases.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.