Skip to main content
Version: 1.5.0 (latest)

common.destination.reference

StorageSchemaInfo Objects

class StorageSchemaInfo(NamedTuple)

[view_source]

from_normalized_mapping

@classmethod
def from_normalized_mapping(
cls, normalized_doc: Dict[str, Any],
naming_convention: NamingConvention) -> "StorageSchemaInfo"

[view_source]

Instantiate this class from mapping where keys are normalized according to given naming convention

Arguments:

  • normalized_doc - Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})
  • naming_convention - Naming convention that was used to normalize keys

Returns:

  • StorageSchemaInfo - Instance of this class

to_normalized_mapping

def to_normalized_mapping(
naming_convention: NamingConvention) -> Dict[str, Any]

[view_source]

Convert this instance to mapping where keys are normalized according to given naming convention

Arguments:

  • naming_convention - Naming convention that should be used to normalize keys

Returns:

Dict[str, Any]: Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})

StateInfo Objects

@dataclasses.dataclass
class StateInfo()

[view_source]

from_normalized_mapping

@classmethod
def from_normalized_mapping(
cls, normalized_doc: Dict[str, Any],
naming_convention: NamingConvention) -> "StateInfo"

[view_source]

Instantiate this class from mapping where keys are normalized according to given naming convention

Arguments:

  • normalized_doc - Mapping with normalized keys (e.g. {Version: ..., PipelineName: ...})
  • naming_convention - Naming convention that was used to normalize keys

Returns:

  • StateInfo - Instance of this class

DestinationClientConfiguration Objects

@configspec
class DestinationClientConfiguration(BaseConfiguration)

[view_source]

destination_type

which destination to load data to

fingerprint

def fingerprint() -> str

[view_source]

Returns a destination fingerprint which is a hash of selected configuration fields. ie. host in case of connection string

__str__

def __str__() -> str

[view_source]

Return displayable destination location

credentials_type

@classmethod
def credentials_type(
cls,
config: "DestinationClientConfiguration" = None
) -> Type[CredentialsConfiguration]

[view_source]

Figure out credentials type, using hint resolvers for dynamic types

For correct type resolution of filesystem, config should have bucket_url populated

DestinationClientDwhConfiguration Objects

@configspec
class DestinationClientDwhConfiguration(DestinationClientConfiguration)

[view_source]

Configuration of a destination that supports datasets/schemas

dataset_name

dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix

default_schema_name

name of default schema to be used to name effective dataset to load data to

replace_strategy

How to handle replace disposition for this destination, can be classic or staging

staging_dataset_name_layout

Layout for staging dataset, where %s is replaced with dataset name. placeholder is optional

enable_dataset_name_normalization

Whether to normalize the dataset name. Affects staging dataset as well.

normalize_dataset_name

def normalize_dataset_name(schema: Schema) -> str

[view_source]

Builds full db dataset (schema) name out of configured dataset name and schema name: {datasetname}{schema.name}. The resulting name is normalized.

If default schema name is None or equals schema.name, the schema suffix is skipped.

normalize_staging_dataset_name

def normalize_staging_dataset_name(schema: Schema) -> str

[view_source]

Builds staging dataset name out of dataset_name and staging_dataset_name_layout.

needs_dataset_name

@classmethod
def needs_dataset_name(cls) -> bool

[view_source]

Checks if configuration requires dataset name to be present. Empty datasets are allowed ie. for schema-less destinations like weaviate or clickhouse

DestinationClientStagingConfiguration Objects

@configspec
class DestinationClientStagingConfiguration(DestinationClientDwhConfiguration)

[view_source]

Configuration of a staging destination, able to store files with desired layout at bucket_url.

Also supports datasets and can act as standalone destination.

DestinationClientDwhWithStagingConfiguration Objects

@configspec
class DestinationClientDwhWithStagingConfiguration(
DestinationClientDwhConfiguration)

[view_source]

Configuration of a destination that can take data from staging destination

staging_config

configuration of the staging, if present, injected at runtime

truncate_tables_on_staging_destination_before_load

If dlt should truncate the tables on staging destination before loading data.

LoadJob Objects

class LoadJob(ABC)

[view_source]

A stateful load job, represents one job file

job_id

def job_id() -> str

[view_source]

The job id that is derived from the file name and does not changes during job lifecycle

file_name

def file_name() -> str

[view_source]

A name of the job file

state

@abstractmethod
def state() -> TLoadJobState

[view_source]

Returns current state. Should poll external resource if necessary.

exception

@abstractmethod
def exception() -> str

[view_source]

The exception associated with failed or retry states

metrics

def metrics() -> Optional[LoadJobMetrics]

[view_source]

Returns job execution metrics

RunnableLoadJob Objects

class RunnableLoadJob(LoadJob, ABC)

[view_source]

Represents a runnable job that loads a single file

Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed". Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present. In "running" state, the loader component periodically gets the state via status() method. When terminal state is reached, load job is discarded and not called again. exception method is called to get error information in "failed" and "retry" states.

The __init__ method is responsible to put the Job in "running" state. It may raise LoadClientTerminalException and LoadClientTransientException to immediately transition job into "failed" or "retry" state respectively.

__init__

def __init__(file_path: str) -> None

[view_source]

File name is also a job id (or job id is deterministically derived) so it must be globally unique

set_run_vars

def set_run_vars(load_id: str, schema: Schema,
load_table: PreparedTableSchema) -> None

[view_source]

called by the loader right before the job is run

run_managed

def run_managed(job_client: "JobClientBase") -> None

[view_source]

wrapper around the user implemented run method

run

@abstractmethod
def run() -> None

[view_source]

run the actual job, this will be executed on a thread and should be implemented by the user exception will be handled outside of this function

state

def state() -> TLoadJobState

[view_source]

Returns current state. Should poll external resource if necessary.

exception

def exception() -> str

[view_source]

The exception associated with failed or retry states

FollowupJobRequest Objects

class FollowupJobRequest()

[view_source]

Base class for follow up jobs that should be created

new_file_path

@abstractmethod
def new_file_path() -> str

[view_source]

Path to a newly created temporary job file. If empty, no followup job should be created

HasFollowupJobs Objects

class HasFollowupJobs()

[view_source]

Adds a trait that allows to create single or table chain followup jobs

create_followup_jobs

def create_followup_jobs(
final_state: TLoadJobState) -> List[FollowupJobRequest]

[view_source]

Return list of jobs requests for jobs that should be created. final_state is state to which this job transits

SupportsReadableRelation Objects

class SupportsReadableRelation(Protocol)

[view_source]

A readable relation retrieved from a destination that supports it

columns_schema

Known dlt table columns for this relation

df

def df(chunk_size: int = None) -> Optional[DataFrame]

[view_source]

Fetches the results as data frame. For large queries the results may be chunked

Fetches the results into a data frame. The default implementation uses helpers in pandas.io.sql to generate Pandas data frame. This function will try to use native data frame generation for particular destination. For BigQuery: QueryJob.to_dataframe is used. For duckdb: `DuckDBPyConnection.df'

Arguments:

  • chunk_size int, optional - Will chunk the results into several data frames. Defaults to None
  • **kwargs Any - Additional parameters which will be passed to native data frame generation function.

Returns:

  • Optional[DataFrame] - A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results

arrow

def arrow(chunk_size: int = None) -> Optional[ArrowTable]

[view_source]

fetch arrow table of first 'chunk_size' items

iter_df

def iter_df(chunk_size: int) -> Generator[DataFrame, None, None]

[view_source]

iterate over data frames tables of 'chunk_size' items

iter_arrow

def iter_arrow(chunk_size: int) -> Generator[ArrowTable, None, None]

[view_source]

iterate over arrow tables of 'chunk_size' items

fetchall

def fetchall() -> List[Tuple[Any, ...]]

[view_source]

fetch all items as list of python tuples

fetchmany

def fetchmany(chunk_size: int) -> List[Tuple[Any, ...]]

[view_source]

fetch first 'chunk_size' items as list of python tuples

iter_fetch

def iter_fetch(chunk_size: int) -> Generator[List[Tuple[Any, ...]], Any, Any]

[view_source]

iterate in lists of python tuples in 'chunk_size' chunks

fetchone

def fetchone() -> Optional[Tuple[Any, ...]]

[view_source]

fetch first item as python tuple

limit

def limit(limit: int, **kwargs: Any) -> "SupportsReadableRelation"

[view_source]

limit the result to 'limit' items

def head(limit: int = 5) -> "SupportsReadableRelation"

[view_source]

limit the result to 5 items by default

select

def select(*columns: str) -> "SupportsReadableRelation"

[view_source]

set which columns will be selected

__getitem__

def __getitem__(
columns: Union[str, Sequence[str]]) -> "SupportsReadableRelation"

[view_source]

set which columns will be selected

__getattr__

def __getattr__(attr: str) -> Any

[view_source]

get an attribute of the relation

__copy__

def __copy__() -> "SupportsReadableRelation"

[view_source]

create a copy of the relation object

DBApiCursor Objects

class DBApiCursor(SupportsReadableRelation)

[view_source]

Protocol for DBAPI cursor

native_cursor

Cursor implementation native to current destination

SupportsReadableDataset Objects

class SupportsReadableDataset(Protocol)

[view_source]

A readable dataset retrieved from a destination, has support for creating readable relations for a query or table

JobClientBase Objects

class JobClientBase(ABC)

[view_source]

initialize_storage

@abstractmethod
def initialize_storage(
truncate_tables: Optional[Iterable[str]] = None) -> None

[view_source]

Prepares storage to be used ie. creates database schema or file system folder. Truncates requested tables.

is_storage_initialized

@abstractmethod
def is_storage_initialized() -> bool

[view_source]

Returns if storage is ready to be read/written.

drop_storage

@abstractmethod
def drop_storage() -> None

[view_source]

Brings storage back into not initialized state. Typically data in storage is destroyed.

verify_schema

def verify_schema(
only_tables: Iterable[str] = None,
new_jobs: Iterable[ParsedLoadJobFileName] = None
) -> List[PreparedTableSchema]

[view_source]

Verifies schema before loading, returns a list of verified loaded tables.

update_stored_schema

def update_stored_schema(
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None) -> Optional[TSchemaTables]

[view_source]

Updates storage to the current schema.

Implementations should not assume that expected_update is the exact difference between destination state and the self.schema. This is only the case if destination has single writer and no other processes modify the schema.

Arguments:

  • only_tables Sequence[str], optional - Updates only listed tables. Defaults to None.
  • expected_update TSchemaTables, optional - Update that is expected to be applied to the destination

Returns:

  • Optional[TSchemaTables] - Returns an update that was applied at the destination.

prepare_load_table

def prepare_load_table(table_name: str) -> PreparedTableSchema

[view_source]

Prepares a table schema to be loaded by filling missing hints and doing other modifications requires by given destination.

create_load_job

@abstractmethod
def create_load_job(table: PreparedTableSchema,
file_path: str,
load_id: str,
restore: bool = False) -> LoadJob

[view_source]

Creates a load job for a particular table with content in file_path. Table is already prepared to be loaded.

prepare_load_job_execution

def prepare_load_job_execution(job: RunnableLoadJob) -> None

[view_source]

Prepare the connected job client for the execution of a load job (used for query tags in sql clients)

create_table_chain_completed_followup_jobs

def create_table_chain_completed_followup_jobs(
table_chain: Sequence[PreparedTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None
) -> List[FollowupJobRequest]

[view_source]

Creates a list of followup jobs that should be executed after a table chain is completed. Tables are already prepared to be loaded.

complete_load

@abstractmethod
def complete_load(load_id: str) -> None

[view_source]

Marks the load package with load_id as completed in the destination. Before such commit is done, the data with load_id is invalid.

WithStateSync Objects

class WithStateSync(ABC)

[view_source]

get_stored_schema

@abstractmethod
def get_stored_schema(schema_name: str = None) -> Optional[StorageSchemaInfo]

[view_source]

Retrieves newest schema with given name from destination storage If no name is provided, the newest schema found is retrieved.

get_stored_schema_by_hash

@abstractmethod
def get_stored_schema_by_hash(version_hash: str) -> StorageSchemaInfo

[view_source]

retrieves the stored schema by hash

get_stored_state

@abstractmethod
def get_stored_state(pipeline_name: str) -> Optional[StateInfo]

[view_source]

Loads compressed state from destination storage

WithStagingDataset Objects

class WithStagingDataset(ABC)

[view_source]

Adds capability to use staging dataset and request it from the loader

with_staging_dataset

@abstractmethod
def with_staging_dataset() -> ContextManager["JobClientBase"]

[view_source]

Executes job client methods on staging dataset

SupportsStagingDestination Objects

class SupportsStagingDestination(ABC)

[view_source]

Adds capability to support a staging destination for the load

should_load_data_to_staging_dataset_on_staging_destination

def should_load_data_to_staging_dataset_on_staging_destination(
table_name: str) -> bool

[view_source]

If set to True, and staging destination is configured, the data will be loaded to staging dataset on staging destination instead of a regular dataset on staging destination. Currently it is used by Athena Iceberg which uses staging dataset on staging destination to copy data to iceberg tables stored on regular dataset on staging destination. The default is to load data to regular dataset on staging destination from where warehouses like Snowflake (that have their own storage) will copy data.

should_truncate_table_before_load_on_staging_destination

@abstractmethod
def should_truncate_table_before_load_on_staging_destination(
table_name: str) -> bool

[view_source]

If set to True, data in table will be truncated on staging destination (regular dataset). This is the default behavior which can be changed with a config flag. For Athena + Iceberg this setting is always False - Athena uses regular dataset to store Iceberg tables and we avoid touching it. For Athena we truncate those tables only on "replace" write disposition.

Destination Objects

class Destination(ABC, Generic[TDestinationConfig, TDestinationClient])

[view_source]

A destination factory that can be partially pre-configured with credentials and other config params.

config_params

Explicit config params, overriding any injected or default values.

caps_params

Explicit capabilities params, overriding any default values for this destination

spec

@property
@abstractmethod
def spec() -> Type[TDestinationConfig]

[view_source]

A spec of destination configuration that also contains destination credentials

capabilities

def capabilities(
config: Optional[TDestinationConfig] = None,
naming: Optional[NamingConvention] = None
) -> DestinationCapabilitiesContext

[view_source]

Destination capabilities ie. supported loader file formats, identifier name lengths, naming conventions, escape function etc. Explicit caps arguments passed to the factory init and stored in caps_params are applied.

If config is provided, it is used to adjust the capabilities, otherwise the explicit config composed just of config_params passed to factory init is applied If naming is provided, the case sensitivity and case folding are adjusted.

destination_name

@property
def destination_name() -> str

[view_source]

The destination name will either be explicitly set while creating the destination or will be taken from the type

client_class

@property
@abstractmethod
def client_class() -> Type[TDestinationClient]

[view_source]

A job client class responsible for starting and resuming load jobs

configuration

def configuration(initial_config: TDestinationConfig,
accept_partial: bool = False) -> TDestinationConfig

[view_source]

Get a fully resolved destination config from the initial config

client

def client(schema: Schema,
initial_config: TDestinationConfig = None) -> TDestinationClient

[view_source]

Returns a configured instance of the destination's job client

adjust_capabilities

@classmethod
def adjust_capabilities(
cls, caps: DestinationCapabilitiesContext, config: TDestinationConfig,
naming: Optional[NamingConvention]) -> DestinationCapabilitiesContext

[view_source]

Adjust the capabilities to match the case sensitivity as requested by naming convention.

normalize_type

@staticmethod
def normalize_type(destination_type: str) -> str

[view_source]

Normalizes destination type string into a canonical form. Assumes that type names without dots correspond to built in destinations.

from_reference

@staticmethod
def from_reference(
ref: TDestinationReferenceArg,
credentials: Optional[Any] = None,
destination_name: Optional[str] = None,
environment: Optional[str] = None,
**kwargs: Any
) -> Optional["Destination[DestinationClientConfiguration, JobClientBase]"]

[view_source]

Instantiate destination from str reference. The ref can be a destination name or import path pointing to a destination class (e.g. dlt.destinations.postgres)

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.