pipeline.pipeline
Pipeline Objects
class Pipeline(SupportsPipeline)
pipeline_name
Name of the pipeline
first_run
Indicates a first run of the pipeline, where run ends with successful loading of the data
pipelines_dir
A directory where the pipelines' working directories are created
working_dir
A working directory of the pipeline
dataset_name
Name of the dataset to which pipeline will be loaded to
is_active
Tells if instance is currently active and available via dlt.pipeline()
__init__
def __init__(pipeline_name: str,
pipelines_dir: str,
pipeline_salt: TSecretStrValue,
destination: AnyDestination,
staging: AnyDestination,
dataset_name: str,
import_schema_path: str,
export_schema_path: str,
dev_mode: bool,
progress: _Collector,
must_attach_to_local_pipeline: bool,
config: PipelineConfiguration,
runtime: RuntimeConfiguration,
refresh: Optional[TRefreshMode] = None) -> None
Initializes the Pipeline class which implements dlt
pipeline. Please use pipeline
function in dlt
module to create a new Pipeline instance.
drop
def drop(pipeline_name: str = None) -> "Pipeline"
Deletes local pipeline state, schemas and any working files. Re-initializes
all internal fields via init. If pipeline_name
is specified that is
different from the current name, new pipeline instance is created, activated and returned.
Note that original pipeline is still dropped.
Arguments:
pipeline_name
str - Optional. New pipeline name. Creates and activates new instance
extract
@with_runtime_trace()
@with_schemas_sync
@with_state_sync(may_extract_state=True)
@with_config_section((known_sections.EXTRACT, ))
def extract(data: Any,
*,
table_name: str = None,
parent_table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
max_parallel_items: int = ConfigValue,
workers: int = ConfigValue,
table_format: TTableFormat = None,
schema_contract: TSchemaContract = None,
refresh: Optional[TRefreshMode] = None) -> ExtractInfo
Extracts the data
and prepare it for the normalization. Does not require destination or credentials to be configured. See run
method for the arguments' description.
normalize
@with_runtime_trace()
@with_schemas_sync
@with_config_section((known_sections.NORMALIZE, ))
def normalize(workers: int = 1,
loader_file_format: TLoaderFileFormat = None) -> NormalizeInfo
Normalizes the data prepared with extract
method, infers the schema and creates load packages for the load
method. Requires destination
to be known.
load
@with_runtime_trace(send_state=True)
@with_state_sync()
@with_config_section((known_sections.LOAD, ))
def load(destination: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
*,
workers: int = 20,
raise_on_failed_jobs: bool = ConfigValue) -> LoadInfo
Loads the packages prepared by normalize
method into the dataset_name
at destination
, optionally using provided credentials
run
@with_runtime_trace()
@with_config_section(("run", ))
def run(data: Any = None,
*,
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
table_format: TTableFormat = None,
schema_contract: TSchemaContract = None,
refresh: Optional[TRefreshMode] = None) -> LoadInfo
Loads the data from data
argument into the destination specified in destination
and dataset specified in dataset_name
.
Notes:
This method will extract
the data from the data
argument, infer the schema, normalize
the data into a load package (ie. jsonl or PARQUET files representing tables) and then load
such packages into the destination
.
The data may be supplied in several forms:
a
list
orIterable
of any JSON-serializable objects ie.dlt.run([1, 2, 3], table_name="numbers")
any
Iterator
or a function that yield (Generator
) ie.dlt.run(range(1, 10), table_name="range")
a function or a list of functions decorated with @dlt.resource ie.
dlt.run([chess_players(title="GM"), chess_games()])
a function or a list of functions decorated with @dlt.source.
Please note that
dlt
deals withbytes
,datetime
,decimal
anduuid
objects so you are free to load documents containing ie. binary data or dates.Execution: The
run
method will first usesync_destination
method to synchronize pipeline state and schemas with the destination. You can disable this behavior withrestore_from_destination
configuration option. Next it will make sure that data from the previous is fully processed. If not,run
method normalizes, loads pending data items and exits If there was no pending data, new data fromdata
argument is extracted, normalized and loaded.
Arguments:
data
Any - Data to be loaded to destinationdestination
str | DestinationReference, optional - A name of the destination to which dlt will load the data, or a destination module imported fromdlt.destination
. If not provided, the value passed todlt.pipeline
will be used.dataset_name
str, optional - A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie.schema
in relational databases or folder grouping many files. If not provided, the value passed todlt.pipeline
will be used. If not provided at all then defaults to thepipeline_name
credentials
Any, optional - Credentials for thedestination
ie. database connection string or a dictionary with google cloud credentials. In most cases should be set to None, which letsdlt
to usesecrets.toml
or environment variables to infer right credentials values.table_name
str, optional - The name of the table to which the data should be loaded within thedataset
. This argument is required for adata
that is a list/Iterable or Iterator without__name__
attribute. The behavior of this argument depends on the type of thedata
:- generator functions - the function name is used as table name,
table_name
overrides this default @dlt.resource
- resource contains the full table schema and that includes the table name.table_name
will override this property. Use with care!@dlt.source
- source contains several resources each with a table schema.table_name
will override all table names within the source and load the data into single table.
- generator functions - the function name is used as table name,
write_disposition
TWriteDispositionConfig, optional - Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. Allowed shorthand string literals:append
will always add new data at the end of the table.replace
will replace existing data with new data.skip
will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table providewrite_disposition={"disposition": "merge", "strategy": "scd2"}
. Please note that in case ofdlt.resource
the table schema value will be overwritten and in case ofdlt.source
, the values in all resources will be overwritten.columns
Sequence[TColumnSchema], optional - A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema.primary_key
str | Sequence[str] - A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data.schema
Schema, optional - An explicitSchema
object in which all table schemas will be grouped. By defaultdlt
takes the schema from the source (if passed indata
argument) or creates a default one itself.loader_file_format
Literal["jsonl", "insert_values", "parquet"], optional - The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.table_format
Literal["delta", "iceberg"], optional - The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations.schema_contract
TSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None.refresh
str | TRefreshMode - Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:drop_sources
- Drop tables and source and resource state for all sources currently being processed inrun
orextract
methods of the pipeline. (Note: schema history is erased)drop_resources
- Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)drop_data
- Wipe all data and resource state for all resources being processed. Schema is not modified.
Raises:
PipelineStepFailed
- when a problem happened duringextract
,normalize
orload
steps.
Returns:
LoadInfo
- Information on loaded data including the list of package ids and failed job statuses. Please not thatdlt
will not raise if a single job terminally fails. Such information is provided via LoadInfo.
sync_destination
@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def sync_destination(destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None) -> None
Synchronizes pipeline state with the destination
's state kept in dataset_name
Notes:
Attempts to restore pipeline state and schemas from the destination. Requires the state that is present at the destination to have a higher version number that state kept locally in working directory. In such a situation the local state, schemas and intermediate files with the data will be deleted and replaced with the state and schema present in the destination.
A special case where the pipeline state exists locally but the dataset does not exist at the destination will wipe out the local state.
Note
- this method is executed by therun
method before any operation on data. Userestore_from_destination
configuration option to disable that behavior.
activate
def activate() -> None
Activates the pipeline
The active pipeline is used as a context for several dlt
components. It provides state to sources and resources evaluated outside of
pipeline.run
and pipeline.extract
method. For example, if the source you use is accessing state in dlt.source
decorated function, the state is provided
by active pipeline.
The name of active pipeline is used when resolving secrets and config values as the optional most outer section during value lookup. For example if pipeline
with name chess_pipeline
is active and dlt
looks for BigQuery
configuration, it will look in chess_pipeline.destination.bigquery.credentials
first and then in
destination.bigquery.credentials
.
Active pipeline also provides the current DestinationCapabilitiesContext to other components ie. Schema instances. Among others, it sets the naming convention and maximum identifier length.
Only one pipeline is active at a given time.
Pipeline created or attached with dlt.pipeline
/'dlt.attachis automatically activated.
run,
loadand
extract` methods also activate pipeline.
deactivate
def deactivate() -> None
Deactivates the pipeline
Pipeline must be active in order to use this method. Please refer to activate()
method for the explanation of active pipeline concept.
has_data
@property
def has_data() -> bool
Tells if the pipeline contains any data: schemas, extracted files, load packages or loaded packages in the destination
has_pending_data
@property
def has_pending_data() -> bool
Tells if the pipeline contains any extracted files or pending load packages
state
@property
def state() -> TPipelineState
Returns a dictionary with the pipeline state
naming
@property
def naming() -> NamingConvention
Returns naming convention of the default schema
last_trace
@property
def last_trace() -> PipelineTrace
Returns or loads last trace generated by pipeline. The trace is loaded from standard location.
list_extracted_resources
@deprecated(
"Please use list_extracted_load_packages instead. Flat extracted storage format got dropped"
" in dlt 0.4.0",
category=Dlt04DeprecationWarning,
)
def list_extracted_resources() -> Sequence[str]
Returns a list of all the files with extracted resources that will be normalized.
list_extracted_load_packages
def list_extracted_load_packages() -> Sequence[str]
Returns a list of all load packages ids that are or will be normalized.
list_normalized_load_packages
def list_normalized_load_packages() -> Sequence[str]
Returns a list of all load packages ids that are or will be loaded.
list_completed_load_packages
def list_completed_load_packages() -> Sequence[str]
Returns a list of all load package ids that are completely loaded
get_load_package_info
def get_load_package_info(load_id: str) -> LoadPackageInfo
Returns information on extracted/normalized/completed package with given load_id, all jobs and their statuses.
get_load_package_state
def get_load_package_state(load_id: str) -> TLoadPackageState
Returns information on extracted/normalized/completed package with given load_id, all jobs and their statuses.
list_failed_jobs_in_package
def list_failed_jobs_in_package(load_id: str) -> Sequence[LoadJobInfo]
List all failed jobs and associated error messages for a specified load_id
drop_pending_packages
def drop_pending_packages(with_partial_loads: bool = True) -> None
Deletes all extracted and normalized packages, including those that are partially loaded by default
sync_schema
@with_schemas_sync
def sync_schema(schema_name: str = None) -> TSchemaTables
Synchronizes the schema schema_name
with the destination. If no name is provided, the default schema will be synchronized.
set_local_state_val
def set_local_state_val(key: str, value: Any) -> None
Sets value in local state. Local state is not synchronized with destination.
get_local_state_val
def get_local_state_val(key: str) -> Any
Gets value from local state. Local state is not synchronized with destination.
sql_client
@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def sql_client(schema_name: str = None) -> SqlClientBase[Any]
Returns a sql client configured to query/change the destination and dataset that were used to load the data.
Use the client with with
statement to manage opening and closing connection to the destination:
with pipeline.sql_client() as client:
with client.execute_query(
"SELECT id, name, email FROM customers WHERE id = %s", 10
) as cursor:
print(cursor.fetchall())
The client is authenticated and defaults all queries to dataset_name used by the pipeline. You can provide alternative
schema_name
which will be used to normalize dataset name.
destination_client
@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def destination_client(schema_name: str = None) -> JobClientBase
Get the destination job client for the configured destination
Use the client with with
statement to manage opening and closing connection to the destination:
with pipeline.destination_client() as client:
client.drop_storage() # removes storage which typically wipes all data in it
The client is authenticated. You can provide alternative schema_name
which will be used to normalize dataset name.
If no schema name is provided and no default schema is present in the pipeline, and ad hoc schema will be created and discarded after use.
managed_state
@contextmanager
def managed_state(*, extract_state: bool = False) -> Iterator[TPipelineState]
Puts pipeline state in managed mode, where yielded state changes will be persisted or fully roll-backed on exception.
Makes the state to be available via StateInjectableContext