helpers.dbt.runner
DBTPackageRunner Objects
class DBTPackageRunner()
A Python wrapper over a dbt package
The created wrapper minimizes the required effort to run dbt
packages on datasets created with dlt
. It clones the package repo and keeps it up to data,
shares the dlt
destination credentials with dbt
and allows the isolated execution with venv
parameter.
The wrapper creates a dbt
profile from a passed dlt
credentials and executes the transformations in source_dataset_name
schema. Additional configuration is
passed via DBTRunnerConfiguration instance
ensure_newest_package
def ensure_newest_package() -> None
Clones or brings the dbt package at package_location
up to date.
run
def run(cmd_params: Sequence[str] = ("--fail-fast", ),
additional_vars: StrAny = None,
destination_dataset_name: str = None) -> Sequence[DBTNodeResult]
Runs dbt
package
Executes dbt run
on previously cloned package.
Arguments:
run_params
Sequence[str], optional - Additional parameters torun
command ie.full-refresh
. Defaults to ("--fail-fast", ).additional_vars
StrAny, optional - Additional jinja variables to be passed to the package. Defaults to None.destination_dataset_name
str, optional - Overwrites the dbt schema where transformed models will be created. Useful for testing or creating several copies of transformed data . Defaults to None.
Returns:
Sequence[DBTNodeResult]
- A list of processed model with names, statuses, execution messages and execution timesExceptions:
DBTProcessingError
-run
command failed. Contains a list of models with their execution statuses and error messages
test
def test(cmd_params: Sequence[str] = None,
additional_vars: StrAny = None,
destination_dataset_name: str = None) -> Sequence[DBTNodeResult]
Tests dbt
package
Executes dbt test
on previously cloned package.
Arguments:
run_params
Sequence[str], optional - Additional parameters totest
command ie. test selectors`.additional_vars
StrAny, optional - Additional jinja variables to be passed to the package. Defaults to None.destination_dataset_name
str, optional - Overwrites the dbt schema where transformed models will be created. Useful for testing or creating several copies of transformed data . Defaults to None.
Returns:
Sequence[DBTNodeResult]
- A list of executed tests with names, statuses, execution messages and execution timesExceptions:
DBTProcessingError
-test
command failed. Contains a list of models with their execution statuses and error messages
run_all
def run_all(run_params: Sequence[str] = ("--fail-fast", ),
additional_vars: StrAny = None,
source_tests_selector: str = None,
destination_dataset_name: str = None) -> Sequence[DBTNodeResult]
Prepares and runs a dbt package.
This method executes typical dbt
workflow with following steps:
- First it clones the package or brings it up to date with the origin. If package location is a local path, it stays intact
- It installs the dependencies (
dbt deps
) - It runs seed (
dbt seed
) - It runs optional tests on the sources
- It runs the package (
dbt run
) - If the
dbt
fails with "incremental model out of sync", it will retry with full-refresh on (only whenauto_full_refresh_when_out_of_sync
is set). See https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change
Arguments:
run_params
Sequence[str], optional - Additional parameters torun
command ie.full-refresh
. Defaults to ("--fail-fast", ).additional_vars
StrAny, optional - Additional jinja variables to be passed to the package. Defaults to None.source_tests_selector
str, optional - A source tests selector ie. will execute all tests fromsources
model. Defaults to None.destination_dataset_name
str, optional - Overwrites the dbt schema where transformed models will be created. Useful for testing or creating several copies of transformed data . Defaults to None.
Returns:
Sequence[DBTNodeResult]
- A list of processed model with names, statuses, execution messages and execution timesExceptions:
DBTProcessingError
- any of the dbt commands failed. Contains a list of models with their execution statuses and error messagesPrerequisitesException
- the source tests failedIncrementalSchemaOutOfSyncError
-run
failed due to schema being out of sync. the DBTProcessingError with failed model is inargs[0]
create_runner
@with_telemetry("helper", "dbt_create_runner", False, "package_profile_name")
@with_config(spec=DBTRunnerConfiguration,
sections=(known_sections.DBT_PACKAGE_RUNNER, ))
def create_runner(venv: Venv,
credentials: DestinationClientDwhConfiguration,
working_dir: str,
package_location: str = dlt.config.value,
package_repository_branch: Optional[str] = None,
package_repository_ssh_key: Optional[TSecretStrValue] = "",
package_profiles_dir: Optional[str] = None,
package_profile_name: Optional[str] = None,
auto_full_refresh_when_out_of_sync: bool = True,
config: DBTRunnerConfiguration = None) -> DBTPackageRunner
Creates a Python wrapper over dbt
package present at specified location, that allows to control it (ie. run and test) from Python code.
The created wrapper minimizes the required effort to run dbt
packages. It clones the package repo and keeps it up to data,
optionally shares the dlt
destination credentials with dbt
and allows the isolated execution with venv
parameter.
Note that you can pass config and secrets in DBTRunnerConfiguration as configuration in section "dbt_package_runner"
Arguments:
venv
Venv - A virtual environment with required dbt dependencies. Pass None to use current environment.credentials
DestinationClientDwhConfiguration - Any configuration deriving from DestinationClientDwhConfiguration ie. ConnectionStringCredentialsworking_dir
str - A working dir to which the package will be clonedpackage_location
str - A git repository url to be cloned or a local path where dbt package is presentpackage_repository_branch
str, optional - A branch name, tag name or commit-id to check out. Defaults to None.package_repository_ssh_key
TSecretValue, optional - SSH key to be used to clone private repositories. Defaults to TSecretValue("").package_profiles_dir
str, optional - Path to the folder where "profiles.yml" residespackage_profile_name
str, optional - Name of the profile in "profiles.yml"auto_full_refresh_when_out_of_sync
bool, optional - If set to True (default), the wrapper will automatically fall back to full-refresh mode when schema is out of syncSee
- https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change_description_. Defaults to None.config
DBTRunnerConfiguration, optional - Explicit additional configuration for the runner.
Returns:
DBTPackageRunner
- A Pythondbt
wrapper