dlt.helpers.dbt.runner
DBTPackageRunner Objects
class DBTPackageRunner()
A Python wrapper over a dbt package
The created wrapper minimizes the required effort to run dbt packages on datasets created with dlt. It clones the package repo and keeps it up to data,
shares the dlt destination credentials with dbt and allows the isolated execution with venv parameter.
The wrapper creates a dbt profile from a passed dlt credentials and executes the transformations in source_dataset_name schema. Additional configuration is
passed via DBTRunnerConfiguration instance
ensure_newest_package
def ensure_newest_package() -> None
Clones or brings the dbt package at package_location up to date.
run
def run(cmd_params: Sequence[str] = ("--fail-fast", ),
additional_vars: StrAny = None,
destination_dataset_name: str = None) -> Sequence[DBTNodeResult]
Runs dbt package
Executes dbt run on previously cloned package.
Arguments:
run_paramsSequence[str], optional - Additional parameters toruncommand ie.full-refresh. Defaults to ("--fail-fast", ).additional_varsStrAny, optional - Additional jinja variables to be passed to the package. Defaults to None.destination_dataset_namestr, optional - Overwrites the dbt schema where transformed models will be created. Useful for testing or creating several copies of transformed data . Defaults to None.
Returns:
-
Sequence[DBTNodeResult]- A list of processed model with names, statuses, execution messages and execution timesExceptions:
-
DBTProcessingError-runcommand failed. Contains a list of models with their execution statuses and error messages
test
def test(cmd_params: Sequence[str] = None,
additional_vars: StrAny = None,
destination_dataset_name: str = None) -> Sequence[DBTNodeResult]
Tests dbt package
Executes dbt test on previously cloned package.
Arguments:
run_paramsSequence[str], optional - Additional parameters totestcommand ie. test selectors`.additional_varsStrAny, optional - Additional jinja variables to be passed to the package. Defaults to None.destination_dataset_namestr, optional - Overwrites the dbt schema where transformed models will be created. Useful for testing or creating several copies of transformed data . Defaults to None.
Returns:
-
Sequence[DBTNodeResult]- A list of executed tests with names, statuses, execution messages and execution timesExceptions:
-
DBTProcessingError-testcommand failed. Contains a list of models with their execution statuses and error messages
run_all
def run_all(run_params: Sequence[str] = ("--fail-fast", ),
additional_vars: StrAny = None,
source_tests_selector: str = None,
destination_dataset_name: str = None) -> Sequence[DBTNodeResult]
Prepares and runs a dbt package.
This method executes typical dbt workflow with following steps:
- First it clones the package or brings it up to date with the origin. If package location is a local path, it stays intact
- It installs the dependencies (
dbt deps) - It runs seed (
dbt seed) - It runs optional tests on the sources
- It runs the package (
dbt run) - If the
dbtfails with "incremental model out of sync", it will retry with full-refresh on (only whenauto_full_refresh_when_out_of_syncis set). See https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change
Arguments:
run_paramsSequence[str], optional - Additional parameters toruncommand ie.full-refresh. Defaults to ("--fail-fast", ).additional_varsStrAny, optional - Additional jinja variables to be passed to the package. Defaults to None.source_tests_selectorstr, optional - A source tests selector ie. will execute all tests fromsourcesmodel. Defaults to None.destination_dataset_namestr, optional - Overwrites the dbt schema where transformed models will be created. Useful for testing or creating several copies of transformed data . Defaults to None.
Returns:
-
Sequence[DBTNodeResult]- A list of processed model with names, statuses, execution messages and execution timesExceptions:
-
DBTProcessingError- any of the dbt commands failed. Contains a list of models with their execution statuses and error messages -
PrerequisitesException- the source tests failed -
IncrementalSchemaOutOfSyncError-runfailed due to schema being out of sync. the DBTProcessingError with failed model is inargs[0]
create_runner
@with_telemetry("helper", "dbt_create_runner", False, "package_profile_name")
@with_config(spec=DBTRunnerConfiguration,
sections=(known_sections.DBT_PACKAGE_RUNNER, ))
def create_runner(venv: Venv,
credentials: DestinationClientDwhConfiguration,
working_dir: str,
package_location: str = dlt.config.value,
package_repository_branch: Optional[str] = None,
package_repository_ssh_key: Optional[TSecretStrValue] = "",
package_profiles_dir: Optional[str] = None,
package_profile_name: Optional[str] = None,
auto_full_refresh_when_out_of_sync: bool = True,
config: DBTRunnerConfiguration = None) -> DBTPackageRunner
Creates a Python wrapper over dbt package present at specified location, that allows to control it (ie. run and test) from Python code.
The created wrapper minimizes the required effort to run dbt packages. It clones the package repo and keeps it up to data,
optionally shares the dlt destination credentials with dbt and allows the isolated execution with venv parameter.
Note that you can pass config and secrets in DBTRunnerConfiguration as configuration in section "dbt_package_runner"
Arguments:
venvVenv - A virtual environment with required dbt dependencies. Pass None to use current environment.credentialsDestinationClientDwhConfiguration - Any configuration deriving from DestinationClientDwhConfiguration ie. ConnectionStringCredentialsworking_dirstr - A working dir to which the package will be clonedpackage_locationstr - A git repository url to be cloned or a local path where dbt package is presentpackage_repository_branchstr, optional - A branch name, tag name or commit-id to check out. Defaults to None.package_repository_ssh_keyTSecretValue, optional - SSH key to be used to clone private repositories. Defaults to TSecretValue("").package_profiles_dirstr, optional - Path to the folder where "profiles.yml" residespackage_profile_namestr, optional - Name of the profile in "profiles.yml"auto_full_refresh_when_out_of_syncbool, optional - If set to True (default), the wrapper will automatically fall back to full-refresh mode when schema is out of syncSee- https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change_description_. Defaults to None.configDBTRunnerConfiguration, optional - Explicit additional configuration for the runner.
Returns:
DBTPackageRunner- A Pythondbtwrapper