helpers.airflow_helper
PipelineTasksGroup Objects
class PipelineTasksGroup(TaskGroup)
Represents a dlt Airflow pipeline task group.
__init__
def __init__(pipeline_name: str,
use_data_folder: bool = False,
local_data_folder: str = None,
use_task_logger: bool = True,
log_progress_period: float = 30.0,
buffer_max_items: int = 1000,
retry_policy: Retrying = DEFAULT_RETRY_NO_RETRY,
retry_pipeline_steps: Sequence[TPipelineStep] = ("load", ),
abort_task_if_any_job_failed: bool = True,
wipe_local_data: bool = True,
save_load_info: bool = False,
save_trace_info: bool = False,
**kwargs: Any) -> None
Creates a task group to which you can add pipeline runs
The run environment is prepared as follows
- the .dlt folder (the project folder) is searched under
dags
as configured by Airflow - the data folder where pipelines are stored is always unique
The data_folder
is available in certain Airflow deployments. In case of Composer, it is a location on the gcs bucket. use_data_folder
is disabled and should be
enabled only when needed. The operations on bucket are non-atomic and way slower than on local storage and should be avoided.
abort_task_if_any_job_failed
will abort the other dlt loading jobs and fail the Airflow task in any of the jobs failed. See https://dlthub.com/docs/running-in-production/running#handle-exceptions-failed-jobs-and-retry-the-pipeline.
The load info and trace info can be optionally saved to the destination. See https://dlthub.com/docs/running-in-production/running#inspect-and-save-the-load-info-and-trace
Arguments:
pipeline_name
str - Name of the task groupuse_data_folder
bool, optional - If well defined 'data' folder is present it will be used. Currently only data folder on Composer is supported. Defaults to False.local_data_folder
str, optional - Path to a local folder on worker machine to where to store data. Used if local_data_folder is False or there's not well defined data folder. Defaults to gettempdir.use_task_logger
bool, optional - Will redirect dlt logger into task logger. Defaults to True.log_progress_period
float, optional - If progress is not configured for a pipeline, thelog
progress is used with a given period. Set 0 to disable. Defaults to 30.0.buffer_max_items
int, optional - Maximum number of buffered items. Use 0 to keep dlt built-in limit. Defaults to 1000.retry_policy
_type, optional_ - Tenacity retry policy. Defaults to no retry.retry_pipeline_steps
Sequence[TPipelineStep], optional - Which pipeline steps are eligible for retry. Defaults to ("load", ).wipe_local_data
bool, optional - Will wipe all the data created by pipeline, also in case of exception. Defaults to False.save_load_info
bool, optional - Will save extensive load info to the destination. Defaults to False.save_trace_info
bool, optional - Will save trace info to the destination. Defaults to False.
run
def run(pipeline: Pipeline,
data: Any,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
pipeline_name: str = None,
on_before_run: Callable[[], None] = None,
**kwargs: Any) -> PythonOperator
Create a task to run the given pipeline with the given data in Airflow.
Arguments:
pipeline
Pipeline - The pipeline to run data (Any): The data to run the pipeline with. If a non-resource callable given, it's evaluated during the DAG execution, right before the actual pipeline run.NOTE
- Ifon_before_run
is provided, firston_before_run
is evaluated, and then callabledata
.table_name
str, optional - The name of the table to which the data should be loaded within thedataset
.write_disposition
TWriteDispositionConfig, optional - Same as inrun
command. loader_file_format (TLoaderFileFormat, optional): The file format the loader will use to create the load package.schema_contract
TSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema.pipeline_name
str, optional - The name of the derived pipeline.on_before_run
Callable, optional - A callable to be executed right before the actual pipeline run.
Returns:
PythonOperator
- Airflow task instance.
add_run
@with_telemetry("helper", "airflow_add_run", False, "decompose")
def add_run(pipeline: Pipeline,
data: Any,
*,
decompose: Literal["none", "serialize", "parallel",
"parallel-isolated"] = "none",
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
loader_file_format: TLoaderFileFormat = None,
schema_contract: TSchemaContract = None,
on_before_run: Callable[[], None] = None,
**kwargs: Any) -> List[PythonOperator]
Creates a task or a group of tasks to run data
with pipeline
Creates an Airflow task that extracts, normalizes and loads data
with the passed pipeline instance pipeline
. If data
is a source
and decompose
is serialize
it will decompose the source into disjoint connected components (isolated group of resources) and execute them
one after another as separate Airflow tasks. The decomposition makes sure that each resource or transformer is extracted only once. It preserves
the order of resources declared in the source when creating graph of tasks.
The kwargs
are passed as arguments to all Airflow task instances created.
Arguments:
pipeline
Pipeline - An instance of pipeline used to run the source data (Any): Any data supported byrun
method of the pipeline. If a non-resource callable given, it's called before the load to get the data. decompose (Literal["none", "serialize", "parallel"], optional): A source decomposition strategy into Airflow tasks: none - no decomposition, default value. serialize - decompose the source into a sequence of Airflow tasks. parallel - decompose the source into a parallel Airflow task group, except the first resource must be completed first. All tasks that are run in parallel share the same pipeline state. If two of them modify the state, part of state may be lost parallel-isolated - decompose the source into a parallel Airflow task group. with the same exception as above. All task have separate pipeline state (via separate pipeline name) but share the same dataset, schemas and tables.NOTE
- The first component of the source in both parallel models is done first, after that the rest are executed in parallel to each other.NOTE
- In case the SequentialExecutor is used by Airflow, the tasks will remain sequential despite 'parallel' or 'parallel-isolated' mode. Use another executor (e.g. CeleryExecutor) to make tasks parallel!Parallel tasks are executed in different pipelines, all derived from the original one, but with the state isolated from each other.
table_name
- (str): The name of the table to which the data should be loaded within thedataset
write_disposition
TWriteDispositionConfig, optional - Same as inrun
command. Defaults to None.loader_file_format
Literal["jsonl", "insert_values", "parquet"], optional - The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.schema_contract
TSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None. on_before_run (Callable, optional): A callable to be executed right before the actual pipeline run.
Returns:
Any
- Airflow tasks created in order of creation.