common.pipeline
_StepInfo Objects
class _StepInfo(NamedTuple)
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
StepInfo Objects
class StepInfo(SupportsHumanize, Generic[TStepMetricsCo])
metrics
Metrics per load id. If many sources with the same name were extracted, there will be more than 1 element in the list
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
started_at
@property
def started_at() -> datetime.datetime
Returns the earliest start date of all collected metrics
finished_at
@property
def finished_at() -> datetime.datetime
Returns the latest end date of all collected metrics
_ExtractInfo Objects
class _ExtractInfo(NamedTuple)
NamedTuple cannot be part of the derivation chain so we must re-declare all fields to use it as mixin later
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
ExtractInfo Objects
class ExtractInfo(StepInfo[ExtractMetrics], _ExtractInfo)
A tuple holding information on extracted data items. Returned by pipeline extract
method.
asdict
def asdict() -> DictStrAny
A dictionary representation of ExtractInfo that can be loaded with dlt
_NormalizeInfo Objects
class _NormalizeInfo(NamedTuple)
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
NormalizeInfo Objects
class NormalizeInfo(StepInfo[NormalizeMetrics], _NormalizeInfo)
A tuple holding information on normalized data items. Returned by pipeline normalize
method.
asdict
def asdict() -> DictStrAny
A dictionary representation of NormalizeInfo that can be loaded with dlt
_LoadInfo Objects
class _LoadInfo(NamedTuple)
loads_ids
ids of the loaded packages
load_packages
Information on loaded packages
LoadInfo Objects
class LoadInfo(StepInfo[LoadMetrics], _LoadInfo)
A tuple holding the information on recently loaded packages. Returned by pipeline run
and load
methods
asdict
def asdict() -> DictStrAny
A dictionary representation of LoadInfo that can be loaded with dlt
has_failed_jobs
@property
def has_failed_jobs() -> bool
Returns True if any of the load packages has a failed job.
raise_on_failed_jobs
def raise_on_failed_jobs() -> None
Raises DestinationHasFailedJobs
exception if any of the load packages has a failed job.
WithStepInfo Objects
class WithStepInfo(ABC, Generic[TStepMetrics, TStepInfo])
Implemented by classes that generate StepInfo with metrics and package infos
current_load_id
@property
def current_load_id() -> str
Returns currently processing load id
get_step_info
@abstractmethod
def get_step_info(pipeline: "SupportsPipeline") -> TStepInfo
Returns and instance of StepInfo with metrics and package infos
TPipelineLocalState Objects
class TPipelineLocalState(TypedDict)
first_run
Indicates a first run of the pipeline, where run ends with successful loading of data
TPipelineState Objects
class TPipelineState(TVersionedState)
Schema for a pipeline state that is stored within the pipeline working directory
default_schema_name
Name of the first schema added to the pipeline to which all the resources without schemas will be added
schema_names
All the schemas present within the pipeline working directory
TSourceState Objects
class TSourceState(TPipelineState)
sources
type: ignore[misc]
SupportsPipeline Objects
class SupportsPipeline(Protocol)
A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties
pipeline_name
Name of the pipeline
default_schema_name
Name of the default schema
destination
The destination reference which is ModuleType. destination.__name__
returns the name string
dataset_name
Name of the dataset to which pipeline will be loaded to
runtime_config
A configuration of runtime options like logging level and format and various tracing options
working_dir
A working directory of the pipeline
pipeline_salt
A configurable pipeline secret to be used as a salt or a seed for encryption key
first_run
Indicates a first run of the pipeline, where run ends with successful loading of the data
state
@property
def state() -> TPipelineState
Returns dictionary with pipeline state
schemas
@property
def schemas() -> Mapping[str, Schema]
Mapping of all pipeline schemas
set_local_state_val
def set_local_state_val(key: str, value: Any) -> None
Sets value in local state. Local state is not synchronized with destination.
get_local_state_val
def get_local_state_val(key: str) -> Any
Gets value from local state. Local state is not synchronized with destination.
PipelineContext Objects
@configspec
class PipelineContext(ContainerInjectableContext)
pipeline
def pipeline() -> SupportsPipeline
Creates or returns exiting pipeline
cls__init__
@classmethod
def cls__init__(
deferred_pipeline: Callable[..., SupportsPipeline] = None) -> None
Initialize the context with a function returning the Pipeline object to allow creation on first use
current_pipeline
def current_pipeline() -> SupportsPipeline
Gets active pipeline context or None if not found
pipeline_state
def pipeline_state(
container: Container,
initial_default: TPipelineState = None) -> Tuple[TPipelineState, bool]
Gets value of the state from context or active pipeline, if none found returns initial_default
Injected state is called "writable": it is injected by the Pipeline
class and all the changes will be persisted.
The state coming from pipeline context or initial_default
is called "read only" and all the changes to it will be discarded
Returns tuple (state, writable)
source_state
def source_state() -> DictStrAny
Returns a dictionary with the source-scoped state. Source-scoped state may be shared across the resources of a particular source. Please avoid using source scoped state. Check
the resource_state
function for resource-scoped state that is visible within particular resource. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.
Notes:
The source state is a python dictionary-like object that is available within the @dlt.source
and @dlt.resource
decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
- The source state is scoped to a particular source and will be stored under the source name in the pipeline state
- It is possible to share state across many sources if they share a schema with the same name
- Any JSON-serializable values can be written and the read from the state.
dlt
dumps and restores instances of Python bytes, DateTime, Date and Decimal types. - The state available in the source decorated function is read only and any changes will be discarded.
- The state available in the resource decorated function is writable and written values will be available on the next pipeline run
resource_state
def resource_state(resource_name: str = None,
source_state_: Optional[DictStrAny] = None) -> DictStrAny
Returns a dictionary with the resource-scoped state. Resource-scoped state is visible only to resource requesting the access. Dlt state is preserved across pipeline runs and may be used to implement incremental loads.
Note that this function accepts the resource name as optional argument. There are rare cases when dlt
is not able to resolve resource name due to requesting function
working in different thread than the main. You'll need to pass the name explicitly when you request resource_state from async functions or functions decorated with @defer.
Summary:
The resource state is a python dictionary-like object that is available within the @dlt.resource
decorated functions and may be read and written to.
The data within the state is loaded into destination together with any other extracted data and made automatically available to the source/resource extractor functions when they are run next time.
When using the state:
- The resource state is scoped to a particular resource requesting it.
- Any JSON-serializable values can be written and the read from the state.
dlt
dumps and restores instances of Python bytes, DateTime, Date and Decimal types. - The state available in the resource decorated function is writable and written values will be available on the next pipeline run
Example:
The most typical use case for the state is to implement incremental load.
@dlt.resource(write_disposition="append")
def players_games(chess_url, players, start_month=None, end_month=None):
checked_archives = dlt.current.resource_state().setdefault("archives", [])
archives = players_archives(chess_url, players)
for url in archives:
if url in checked_archives:
print(f"skipping archive {url}")
continue
else:
print(f"getting archive {url}")
checked_archives.append(url)
# get the filtered archive
r = requests.get(url)
r.raise_for_status()
yield r.json().get("games", [])
Here we store all the urls with game archives in the state and we skip loading them on next run. The archives are immutable. The state will grow with the coming months (and more players). Up to few thousand archives we should be good though.
Arguments:
resource_name
str, optional - forces to use state for a resource with this name. Defaults to None.source_state_
Optional[DictStrAny], optional - Alternative source state. Defaults to None.
Raises:
ResourceNameNotAvailable
- Raise if used outside of resource context or from a different thread than main
Returns:
DictStrAny
- State dictionary
reset_resource_state
def reset_resource_state(resource_name: str,
source_state_: Optional[DictStrAny] = None) -> None
Resets the resource state with name resource_name
by removing it from source_state
Arguments:
resource_name
- The resource key to resetstate
- Optional source state dictionary to operate on. Use when working outside source context.
get_dlt_pipelines_dir
def get_dlt_pipelines_dir() -> str
Gets default directory where pipelines' data will be stored
- in user home directory ~/.dlt/pipelines/
- if current user is root in /var/dlt/pipelines
- if current user does not have a home directory in /tmp/dlt/pipelines
get_dlt_repos_dir
def get_dlt_repos_dir() -> str
Gets default directory where command repositories will be stored
set_current_pipe_name
def set_current_pipe_name(name: str) -> None
Set pipe name in current thread
unset_current_pipe_name
def unset_current_pipe_name() -> None
Unset pipe name in current thread
get_current_pipe_name
def get_current_pipe_name() -> str
When executed from withing dlt.resource decorated function, gets pipe name associated with current thread.
Pipe name is the same as resource name for all currently known cases. In some multithreading cases, pipe name may be not available.