Skip to main content
Version: devel

common.pipeline

_StepInfo Objects

class _StepInfo(NamedTuple)

[view_source]

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

StepInfo Objects

class StepInfo(SupportsHumanize, Generic[TStepMetricsCo])

[view_source]

metrics

Metrics per load id. If many sources with the same name were extracted, there will be more than 1 element in the list

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

started_at

@property
def started_at() -> datetime.datetime

[view_source]

Returns the earliest start date of all collected metrics

finished_at

@property
def finished_at() -> datetime.datetime

[view_source]

Returns the latest end date of all collected metrics

_ExtractInfo Objects

class _ExtractInfo(NamedTuple)

[view_source]

NamedTuple cannot be part of the derivation chain so we must re-declare all fields to use it as mixin later

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

ExtractInfo Objects

class ExtractInfo(StepInfo[ExtractMetrics], _ExtractInfo)

[view_source]

A tuple holding information on extracted data items. Returned by pipeline extract method.

asdict

def asdict() -> DictStrAny

[view_source]

A dictionary representation of ExtractInfo that can be loaded with dlt

_NormalizeInfo Objects

class _NormalizeInfo(NamedTuple)

[view_source]

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

NormalizeInfo Objects

class NormalizeInfo(StepInfo[NormalizeMetrics], _NormalizeInfo)

[view_source]

A tuple holding information on normalized data items. Returned by pipeline normalize method.

asdict

def asdict() -> DictStrAny

[view_source]

A dictionary representation of NormalizeInfo that can be loaded with dlt

_LoadInfo Objects

class _LoadInfo(NamedTuple)

[view_source]

loads_ids

ids of the loaded packages

load_packages

Information on loaded packages

LoadInfo Objects

class LoadInfo(StepInfo[LoadMetrics], _LoadInfo)

[view_source]

A tuple holding the information on recently loaded packages. Returned by pipeline run and load methods

asdict

def asdict() -> DictStrAny

[view_source]

A dictionary representation of LoadInfo that can be loaded with dlt

has_failed_jobs

@property
def has_failed_jobs() -> bool

[view_source]

Returns True if any of the load packages has a failed job.

raise_on_failed_jobs

def raise_on_failed_jobs() -> None

[view_source]

Raises DestinationHasFailedJobs exception if any of the load packages has a failed job.

WithStepInfo Objects

class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo])

[view_source]

Implemented by classes that generate StepInfo with metrics and package infos

current_load_id

@property
def current_load_id() -> str

[view_source]

Returns currently processing load id

get_step_info

@abstractmethod
def get_step_info(pipeline: "SupportsPipeline") -> TStepInfo

[view_source]

Returns and instance of StepInfo with metrics and package infos

TPipelineLocalState Objects

class TPipelineLocalState(TypedDict)

[view_source]

first_run

Indicates a first run of the pipeline, where run ends with successful loading of data

initial_cwd

Current working dir when pipeline was instantiated for a first time

TPipelineState Objects

class TPipelineState(TVersionedState)

[view_source]

Schema for a pipeline state that is stored within the pipeline working directory

default_schema_name

Name of the first schema added to the pipeline to which all the resources without schemas will be added

schema_names

All the schemas present within the pipeline working directory

TSourceState Objects

class TSourceState(TPipelineState)

[view_source]

sources

type: ignore[misc]

SupportsPipeline Objects

class SupportsPipeline(Protocol)

[view_source]

A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties

pipeline_name

Name of the pipeline

default_schema_name

Name of the default schema

destination

The destination reference which is ModuleType. destination.__name__ returns the name string

dataset_name

Name of the dataset to which pipeline will be loaded to

runtime_config

A configuration of runtime options like logging level and format and various tracing options

working_dir

A working directory of the pipeline

pipeline_salt

A configurable pipeline secret to be used as a salt or a seed for encryption key

first_run

Indicates a first run of the pipeline, where run ends with successful loading of the data

state

@property
def state() -> TPipelineState

[view_source]

Returns dictionary with pipeline state

schemas

@property
def schemas() -> Mapping[str, Schema]

[view_source]

Mapping of all pipeline schemas

set_local_state_val

def set_local_state_val(key: str, value: Any) -> None

[view_source]

Sets value in local state. Local state is not synchronized with destination.

get_local_state_val

def get_local_state_val(key: str) -> Any

[view_source]

Gets value from local state. Local state is not synchronized with destination.

PipelineContext Objects

@configspec
class PipelineContext(ContainerInjectableContext)

[view_source]

pipeline

def pipeline() -> SupportsPipeline

[view_source]

Creates or returns exiting pipeline

cls__init__

@classmethod
def cls__init__(
deferred_pipeline: Callable[..., SupportsPipeline] = None) -> None

[view_source]

Initialize the context with a function returning the Pipeline object to allow creation on first use

current_pipeline

def current_pipeline() -> SupportsPipeline

[view_source]

Gets active pipeline context or None if not found

pipeline_state

def pipeline_state(
container: Container,
initial_default: TPipelineState = None) -> Tuple[TPipelineState, bool]

[view_source]

Gets value of the state from context or active pipeline, if none found returns initial_default

Injected state is called "writable": it is injected by the Pipeline class and all the changes will be persisted. The state coming from pipeline context or initial_default is called "read only" and all the changes to it will be discarded

Returns tuple (state, writable)

source_state

def source_state() -> DictStrAny

[view_source]

Returns a dictionary with the source-scoped state. Source-scoped state may be shared across the resources of a particular source. Please avoid using source scoped state. Check the resource_state function for resource-scoped state that is visible within particular resource. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.

Notes:

The source state is a python dictionary-like object that is available within the @dlt.source and @dlt.resource decorated functions and may be read and written to. The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. When using the state:

  • The source state is scoped to a particular source and will be stored under the source name in the pipeline state
  • It is possible to share state across many sources if they share a schema with the same name
  • Any JSON-serializable values can be written and the read from the state. dlt dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
  • The state available in the source decorated function is read only and any changes will be discarded.
  • The state available in the resource decorated function is writable and written values will be available on the next pipeline run

resource_state

def resource_state(resource_name: str = None,
source_state_: Optional[DictStrAny] = None) -> DictStrAny

[view_source]

Returns a dictionary with the resource-scoped state. Resource-scoped state is visible only to resource requesting the access. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.

Note that this function accepts the resource name as optional argument. There are rare cases when dlt is not able to resolve resource name due to requesting function working in different thread than the main. You'll need to pass the name explicitly when you request resource_state from async functions or functions decorated with @defer.

Summary: The resource state is a python dictionary-like object that is available within the @dlt.resource decorated functions and may be read and written to. The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time. When using the state:

  • The resource state is scoped to a particular resource requesting it.
  • Any JSON-serializable values can be written and the read from the state. dlt dumps and restores instances of Python bytes, DateTime, Date and Decimal types.
  • The state available in the resource decorated function is writable and written values will be available on the next pipeline run

Example:

The most typical use case for the state is to implement incremental load.

    @dlt.resource(write_disposition="append")
def players_games(chess_url, players, start_month=None, end_month=None):
checked_archives = dlt.current.resource_state().setdefault("archives", [])
archives = players_archives(chess_url, players)
for url in archives:
if url in checked_archives:
print(f"skipping archive {url}")
continue
else:
print(f"getting archive {url}")
checked_archives.append(url)
# get the filtered archive
r = requests.get(url)
r.raise_for_status()
yield r.json().get("games", [])

Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players). Up to few thousand archives we should be good though.

Arguments:

  • resource_name str, optional - forces to use state for a resource with this name. Defaults to None.
  • source_state_ Optional[DictStrAny], optional - Alternative source state. Defaults to None.

Raises:

  • ResourceNameNotAvailable - Raise if used outside of resource context or from a different thread than main

Returns:

  • DictStrAny - State dictionary

reset_resource_state

def reset_resource_state(resource_name: str,
source_state_: Optional[DictStrAny] = None) -> None

[view_source]

Resets the resource state with name resource_name by removing it from source_state

Arguments:

  • resource_name - The resource key to reset
  • state - Optional source state dictionary to operate on. Use when working outside source context.

get_dlt_pipelines_dir

def get_dlt_pipelines_dir() -> str

[view_source]

Gets default directory where pipelines' data will be stored

  1. in user home directory ~/.dlt/pipelines/
  2. if current user is root in /var/dlt/pipelines
  3. if current user does not have a home directory in /tmp/dlt/pipelines

get_dlt_repos_dir

def get_dlt_repos_dir() -> str

[view_source]

Gets default directory where command repositories will be stored

set_current_pipe_name

def set_current_pipe_name(name: str) -> None

[view_source]

Set pipe name in current thread

unset_current_pipe_name

def unset_current_pipe_name() -> None

[view_source]

Unset pipe name in current thread

get_current_pipe_name

def get_current_pipe_name() -> str

[view_source]

When executed from withing dlt.resource decorated function, gets pipe name associated with current thread.

Pipe name is the same as resource name for all currently known cases. In some multithreading cases, pipe name may be not available.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.