dlt.helpers.airflow_helper
PipelineTasksGroup Objects
class PipelineTasksGroup(TaskGroup)
Represents a dlt Airflow pipeline task group.
__init__
def __init__(pipeline_name: str,
use_data_folder: bool = False,
local_data_folder: str = None,
use_task_logger: bool = True,
log_progress_period: float = 30.0,
buffer_max_items: int = 1000,
retry_policy: Retrying = DEFAULT_RETRY_NO_RETRY,
retry_pipeline_steps: Sequence[TPipelineStep] = ("load", ),
abort_task_if_any_job_failed: bool = True,
wipe_local_data: bool = True,
save_load_info: bool = False,
save_trace_info: bool = False,
**kwargs: Any) -> None
Creates a task group to which you can add pipeline runs
The run environment is prepared as follows
- the .dlt folder (the project folder) is searched under
dagsas configured by Airflow - the data folder where pipelines are stored is always unique
The data_folder is available in certain Airflow deployments. In case of Composer, it is a location on the gcs bucket. use_data_folder is disabled and should be
enabled only when needed. The operations on bucket are non-atomic and way slower than on local storage and should be avoided.
abort_task_if_any_job_failed will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. See https://dlthub.com/docs/running-in-production/running#handle-exceptions-failed-jobs-and-retry-the-pipeline.
The load info and trace info can be optionally saved to the destination. See https://dlthub.com/docs/running-in-production/running#inspect-and-save-the-load-info-and-trace
Arguments:
pipeline_namestr - Name of the task groupuse_data_folderbool, optional - If well defined 'data' folder is present it will be used. Currently only data folder on Composer is supported. Defaults to False.local_data_folderstr, optional - Path to a local folder on worker machine to where to store data. Used if local_data_folder is False or there's not well defined data folder. Defaults to gettempdir.use_task_loggerbool, optional - Will redirect dlt logger into task logger. Defaults to True.log_progress_periodfloat, optional - If progress is not configured for a pipeline, thelogprogress is used with a given period. Set 0 to disable. Defaults to 30.0.buffer_max_itemsint, optional - Maximum number of buffered items. Use 0 to keep dlt built-in limit. Defaults to 1000.retry_policytype, optional - Tenacity retry policy. Defaults to no retry.retry_pipeline_stepsSequence[TPipelineStep], optional - Which pipeline steps are eligible for retry. Defaults to ("load", ).wipe_local_databool, optional - Will wipe all the data created by pipeline, also in case of exception. Defaults to False.save_load_infobool, optional - Will save extensive load info to the destination. Defaults to False.save_trace_infobool, optional - Will save trace info to the destination. Defaults to False.
run
def run(pipeline: Pipeline,
data: Any,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
pipeline_name: str = None,
on_before_run: Callable[[], None] = None,
**kwargs: Any) -> PythonOperator
Create a task to run the given pipeline with the given data in Airflow.
Arguments:
pipelinePipeline - The pipeline to run data (Any): The data to run the pipeline with. If a non-resource callable given, it's evaluated during the DAG execution, right before the actual pipeline run.NOTE- Ifon_before_runis provided, firston_before_runis evaluated, and then callabledata.table_namestr, optional - The name of the table to which the data should be loaded within thedataset.write_dispositionTWriteDispositionConfig, optional - Same as inruncommand. loader_file_format (TLoaderFileFormat, optional): The file format the loader will use to create the load package.schema_contractTSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema.pipeline_namestr, optional - The name of the derived pipeline.on_before_runCallable, optional - A callable to be executed right before the actual pipeline run.
Returns:
PythonOperator- Airflow task instance.
add_run
@with_telemetry("helper", "airflow_add_run", False, "decompose")
def add_run(pipeline: Pipeline,
data: Any,
*,
decompose: Literal["none", "serialize", "parallel",
"parallel-isolated"] = "none",
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
on_before_run: Callable[[], None] = None,
**kwargs: Any) -> List[BaseOperator]
Creates a task or a group of tasks to run data with pipeline
Creates an Airflow task that extracts, normalizes and loads data with the passed pipeline instance pipeline. If data is a source
and decompose is serialize it will decompose the source into disjoint connected components (isolated group of resources) and execute them
one after another as separate Airflow tasks. The decomposition makes sure that each resource or transformer is extracted only once. It preserves
the order of resources declared in the source when creating graph of tasks.
The kwargs are passed as arguments to all Airflow task instances created.
Arguments:
-
pipelinePipeline - An instance of pipeline used to run the source data (Any): Any data supported byrunmethod of the pipeline. If a non-resource callable given, it's called before the load to get the data. decompose (Literal["none", "serialize", "parallel"], optional): A source decomposition strategy into Airflow tasks: none - no decomposition, default value. serialize - decompose the source into a sequence of Airflow tasks. parallel - decompose the source into a parallel Airflow task group, except the first resource must be completed first. All tasks that are run in parallel share the same pipeline state. If two of them modify the state, part of state may be lost parallel-isolated - decompose the source into a parallel Airflow task group. with the same exception as above. All task have separate pipeline state (via separate pipeline name) but share the same dataset, schemas and tables. -
NOTE- The first component of the source in both parallel models is done first, after that the rest are executed in parallel to each other. -
NOTE- In case the SequentialExecutor is used by Airflow, the tasks will remain sequential despite 'parallel' or 'parallel-isolated' mode. Use another executor (e.g. CeleryExecutor) to make tasks parallel!Parallel tasks are executed in different pipelines, all derived from the original one, but with the state isolated from each other.
-
table_name- (str): The name of the table to which the data should be loaded within thedataset -
write_dispositionTWriteDispositionConfig, optional - Same as inruncommand. Defaults to None. -
loader_file_formatLiteral["jsonl", "insert_values", "parquet"], optional - The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination. -
schema_contractTSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None. on_before_run (Callable, optional): A callable to be executed right before the actual pipeline run.
Returns:
Any- Airflow tasks created in order of creation.