common.destination.reference
StorageSchemaInfo Objects
class StorageSchemaInfo(NamedTuple)
from_normalized_mapping
@classmethod
def from_normalized_mapping(
cls, normalized_doc: Dict[str, Any],
naming_convention: NamingConvention) -> "StorageSchemaInfo"
Instantiate this class from mapping where keys are normalized according to given naming convention
Arguments:
normalized_doc
- Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})naming_convention
- Naming convention that was used to normalize keys
Returns:
StorageSchemaInfo
- Instance of this class
to_normalized_mapping
def to_normalized_mapping(
naming_convention: NamingConvention) -> Dict[str, Any]
Convert this instance to mapping where keys are normalized according to given naming convention
Arguments:
naming_convention
- Naming convention that should be used to normalize keys
Returns:
Dict[str, Any]: Mapping with normalized keys (e.g. {Version: ..., SchemaName: ...})
StateInfo Objects
@dataclasses.dataclass
class StateInfo()
from_normalized_mapping
@classmethod
def from_normalized_mapping(
cls, normalized_doc: Dict[str, Any],
naming_convention: NamingConvention) -> "StateInfo"
Instantiate this class from mapping where keys are normalized according to given naming convention
Arguments:
normalized_doc
- Mapping with normalized keys (e.g. {Version: ..., PipelineName: ...})naming_convention
- Naming convention that was used to normalize keys
Returns:
StateInfo
- Instance of this class
DestinationClientConfiguration Objects
@configspec
class DestinationClientConfiguration(BaseConfiguration)
destination_type
which destination to load data to
fingerprint
def fingerprint() -> str
Returns a destination fingerprint which is a hash of selected configuration fields. ie. host in case of connection string
__str__
def __str__() -> str
Return displayable destination location
credentials_type
@classmethod
def credentials_type(
cls,
config: "DestinationClientConfiguration" = None
) -> Type[CredentialsConfiguration]
Figure out credentials type, using hint resolvers for dynamic types
For correct type resolution of filesystem, config should have bucket_url populated
DestinationClientDwhConfiguration Objects
@configspec
class DestinationClientDwhConfiguration(DestinationClientConfiguration)
Configuration of a destination that supports datasets/schemas
dataset_name
dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix
default_schema_name
name of default schema to be used to name effective dataset to load data to
replace_strategy
How to handle replace disposition for this destination, can be classic or staging
staging_dataset_name_layout
Layout for staging dataset, where %s is replaced with dataset name. placeholder is optional
enable_dataset_name_normalization
Whether to normalize the dataset name. Affects staging dataset as well.
normalize_dataset_name
def normalize_dataset_name(schema: Schema) -> str
Builds full db dataset (schema) name out of configured dataset name and schema name: {datasetname}{schema.name}. The resulting name is normalized.
If default schema name is None or equals schema.name, the schema suffix is skipped.
normalize_staging_dataset_name
def normalize_staging_dataset_name(schema: Schema) -> str
Builds staging dataset name out of dataset_name and staging_dataset_name_layout.
needs_dataset_name
@classmethod
def needs_dataset_name(cls) -> bool
Checks if configuration requires dataset name to be present. Empty datasets are allowed ie. for schema-less destinations like weaviate or clickhouse
DestinationClientStagingConfiguration Objects
@configspec
class DestinationClientStagingConfiguration(DestinationClientDwhConfiguration)
Configuration of a staging destination, able to store files with desired layout
at bucket_url
.
Also supports datasets and can act as standalone destination.
DestinationClientDwhWithStagingConfiguration Objects
@configspec
class DestinationClientDwhWithStagingConfiguration(
DestinationClientDwhConfiguration)
Configuration of a destination that can take data from staging destination
staging_config
configuration of the staging, if present, injected at runtime
truncate_tables_on_staging_destination_before_load
If dlt should truncate the tables on staging destination before loading data.
LoadJob Objects
class LoadJob(ABC)
A stateful load job, represents one job file
job_id
def job_id() -> str
The job id that is derived from the file name and does not changes during job lifecycle
file_name
def file_name() -> str
A name of the job file
state
@abstractmethod
def state() -> TLoadJobState
Returns current state. Should poll external resource if necessary.
exception
@abstractmethod
def exception() -> str
The exception associated with failed or retry states
metrics
def metrics() -> Optional[LoadJobMetrics]
Returns job execution metrics
RunnableLoadJob Objects
class RunnableLoadJob(LoadJob, ABC)
Represents a runnable job that loads a single file
Each job starts in "running" state and ends in one of terminal states: "retry", "failed" or "completed".
Each job is uniquely identified by a file name. The file is guaranteed to exist in "running" state. In terminal state, the file may not be present.
In "running" state, the loader component periodically gets the state via status()
method. When terminal state is reached, load job is discarded and not called again.
exception
method is called to get error information in "failed" and "retry" states.
The __init__
method is responsible to put the Job in "running" state. It may raise LoadClientTerminalException
and LoadClientTransientException
to
immediately transition job into "failed" or "retry" state respectively.
__init__
def __init__(file_path: str) -> None
File name is also a job id (or job id is deterministically derived) so it must be globally unique
set_run_vars
def set_run_vars(load_id: str, schema: Schema,
load_table: PreparedTableSchema) -> None
called by the loader right before the job is run
run_managed
def run_managed(job_client: "JobClientBase") -> None
wrapper around the user implemented run method
run
@abstractmethod
def run() -> None
run the actual job, this will be executed on a thread and should be implemented by the user exception will be handled outside of this function
state
def state() -> TLoadJobState
Returns current state. Should poll external resource if necessary.
exception
def exception() -> str
The exception associated with failed or retry states
FollowupJobRequest Objects
class FollowupJobRequest()
Base class for follow up jobs that should be created
new_file_path
@abstractmethod
def new_file_path() -> str
Path to a newly created temporary job file. If empty, no followup job should be created
HasFollowupJobs Objects
class HasFollowupJobs()
Adds a trait that allows to create single or table chain followup jobs
create_followup_jobs
def create_followup_jobs(
final_state: TLoadJobState) -> List[FollowupJobRequest]
Return list of jobs requests for jobs that should be created. final_state
is state to which this job transits
SupportsReadableRelation Objects
class SupportsReadableRelation(Protocol)
A readable relation retrieved from a destination that supports it
columns_schema
Known dlt table columns for this relation
df
def df(chunk_size: int = None) -> Optional[DataFrame]
Fetches the results as data frame. For large queries the results may be chunked
Fetches the results into a data frame. The default implementation uses helpers in pandas.io.sql
to generate Pandas data frame.
This function will try to use native data frame generation for particular destination. For BigQuery
: QueryJob.to_dataframe
is used.
For duckdb
: `DuckDBPyConnection.df'
Arguments:
chunk_size
int, optional - Will chunk the results into several data frames. Defaults to None**kwargs
Any - Additional parameters which will be passed to native data frame generation function.
Returns:
Optional[DataFrame]
- A data frame with query results. If chunk_size > 0, None will be returned if there is no more data in results
arrow
def arrow(chunk_size: int = None) -> Optional[ArrowTable]
fetch arrow table of first 'chunk_size' items
iter_df
def iter_df(chunk_size: int) -> Generator[DataFrame, None, None]
iterate over data frames tables of 'chunk_size' items
iter_arrow
def iter_arrow(chunk_size: int) -> Generator[ArrowTable, None, None]
iterate over arrow tables of 'chunk_size' items
fetchall
def fetchall() -> List[Tuple[Any, ...]]
fetch all items as list of python tuples
fetchmany
def fetchmany(chunk_size: int) -> List[Tuple[Any, ...]]
fetch first 'chunk_size' items as list of python tuples
iter_fetch
def iter_fetch(chunk_size: int) -> Generator[List[Tuple[Any, ...]], Any, Any]
iterate in lists of python tuples in 'chunk_size' chunks
fetchone
def fetchone() -> Optional[Tuple[Any, ...]]
fetch first item as python tuple
limit
def limit(limit: int) -> "SupportsReadableRelation"
limit the result to 'limit' items
head
def head(limit: int = 5) -> "SupportsReadableRelation"
limit the result to 5 items by default
select
def select(*columns: str) -> "SupportsReadableRelation"
set which columns will be selected
__getitem__
def __getitem__(
columns: Union[str, Sequence[str]]) -> "SupportsReadableRelation"
set which columns will be selected
__copy__
def __copy__() -> "SupportsReadableRelation"
create a copy of the relation object
DBApiCursor Objects
class DBApiCursor(SupportsReadableRelation)
Protocol for DBAPI cursor
native_cursor
Cursor implementation native to current destination
SupportsReadableDataset Objects
class SupportsReadableDataset(Protocol)
A readable dataset retrieved from a destination, has support for creating readable relations for a query or table
JobClientBase Objects
class JobClientBase(ABC)
initialize_storage
@abstractmethod
def initialize_storage(
truncate_tables: Optional[Iterable[str]] = None) -> None
Prepares storage to be used ie. creates database schema or file system folder. Truncates requested tables.
is_storage_initialized
@abstractmethod
def is_storage_initialized() -> bool
Returns if storage is ready to be read/written.
drop_storage
@abstractmethod
def drop_storage() -> None
Brings storage back into not initialized state. Typically data in storage is destroyed.
verify_schema
def verify_schema(
only_tables: Iterable[str] = None,
new_jobs: Iterable[ParsedLoadJobFileName] = None
) -> List[PreparedTableSchema]
Verifies schema before loading, returns a list of verified loaded tables.
update_stored_schema
def update_stored_schema(
only_tables: Iterable[str] = None,
expected_update: TSchemaTables = None) -> Optional[TSchemaTables]
Updates storage to the current schema.
Implementations should not assume that expected_update
is the exact difference between destination state and the self.schema. This is only the case if
destination has single writer and no other processes modify the schema.
Arguments:
only_tables
Sequence[str], optional - Updates only listed tables. Defaults to None.expected_update
TSchemaTables, optional - Update that is expected to be applied to the destination
Returns:
Optional[TSchemaTables]
- Returns an update that was applied at the destination.
prepare_load_table
def prepare_load_table(table_name: str) -> PreparedTableSchema
Prepares a table schema to be loaded by filling missing hints and doing other modifications requires by given destination.
create_load_job
@abstractmethod
def create_load_job(table: PreparedTableSchema,
file_path: str,
load_id: str,
restore: bool = False) -> LoadJob
Creates a load job for a particular table
with content in file_path
. Table is already prepared to be loaded.
prepare_load_job_execution
def prepare_load_job_execution(job: RunnableLoadJob) -> None
Prepare the connected job client for the execution of a load job (used for query tags in sql clients)
create_table_chain_completed_followup_jobs
def create_table_chain_completed_followup_jobs(
table_chain: Sequence[PreparedTableSchema],
completed_table_chain_jobs: Optional[Sequence[LoadJobInfo]] = None
) -> List[FollowupJobRequest]
Creates a list of followup jobs that should be executed after a table chain is completed. Tables are already prepared to be loaded.
complete_load
@abstractmethod
def complete_load(load_id: str) -> None
Marks the load package with load_id
as completed in the destination. Before such commit is done, the data with load_id
is invalid.
WithStateSync Objects
class WithStateSync(ABC)
get_stored_schema
@abstractmethod
def get_stored_schema(schema_name: str = None) -> Optional[StorageSchemaInfo]
Retrieves newest schema with given name from destination storage If no name is provided, the newest schema found is retrieved.
get_stored_schema_by_hash
@abstractmethod
def get_stored_schema_by_hash(version_hash: str) -> StorageSchemaInfo
retrieves the stored schema by hash
get_stored_state
@abstractmethod
def get_stored_state(pipeline_name: str) -> Optional[StateInfo]
Loads compressed state from destination storage
WithStagingDataset Objects
class WithStagingDataset(ABC)
Adds capability to use staging dataset and request it from the loader
with_staging_dataset
@abstractmethod
def with_staging_dataset() -> ContextManager["JobClientBase"]
Executes job client methods on staging dataset
SupportsStagingDestination Objects
class SupportsStagingDestination(ABC)
Adds capability to support a staging destination for the load
should_load_data_to_staging_dataset_on_staging_destination
def should_load_data_to_staging_dataset_on_staging_destination(
table_name: str) -> bool
If set to True, and staging destination is configured, the data will be loaded to staging dataset on staging destination instead of a regular dataset on staging destination. Currently it is used by Athena Iceberg which uses staging dataset on staging destination to copy data to iceberg tables stored on regular dataset on staging destination. The default is to load data to regular dataset on staging destination from where warehouses like Snowflake (that have their own storage) will copy data.
should_truncate_table_before_load_on_staging_destination
@abstractmethod
def should_truncate_table_before_load_on_staging_destination(
table_name: str) -> bool
If set to True, data in table
will be truncated on staging destination (regular dataset). This is the default behavior which
can be changed with a config flag.
For Athena + Iceberg this setting is always False - Athena uses regular dataset to store Iceberg tables and we avoid touching it.
For Athena we truncate those tables only on "replace" write disposition.
Destination Objects
class Destination(ABC, Generic[TDestinationConfig, TDestinationClient])
A destination factory that can be partially pre-configured with credentials and other config params.
config_params
Explicit config params, overriding any injected or default values.
caps_params
Explicit capabilities params, overriding any default values for this destination
spec
@property
@abstractmethod
def spec() -> Type[TDestinationConfig]
A spec of destination configuration that also contains destination credentials
capabilities
def capabilities(
config: Optional[TDestinationConfig] = None,
naming: Optional[NamingConvention] = None
) -> DestinationCapabilitiesContext
Destination capabilities ie. supported loader file formats, identifier name lengths, naming conventions, escape function etc.
Explicit caps arguments passed to the factory init and stored in caps_params
are applied.
If config
is provided, it is used to adjust the capabilities, otherwise the explicit config composed just of config_params
passed
to factory init is applied
If naming
is provided, the case sensitivity and case folding are adjusted.
destination_name
@property
def destination_name() -> str
The destination name will either be explicitly set while creating the destination or will be taken from the type
client_class
@property
@abstractmethod
def client_class() -> Type[TDestinationClient]
A job client class responsible for starting and resuming load jobs
configuration
def configuration(initial_config: TDestinationConfig,
accept_partial: bool = False) -> TDestinationConfig
Get a fully resolved destination config from the initial config
client
def client(schema: Schema,
initial_config: TDestinationConfig = None) -> TDestinationClient
Returns a configured instance of the destination's job client
adjust_capabilities
@classmethod
def adjust_capabilities(
cls, caps: DestinationCapabilitiesContext, config: TDestinationConfig,
naming: Optional[NamingConvention]) -> DestinationCapabilitiesContext
Adjust the capabilities to match the case sensitivity as requested by naming convention.
normalize_type
@staticmethod
def normalize_type(destination_type: str) -> str
Normalizes destination type string into a canonical form. Assumes that type names without dots correspond to built in destinations.
from_reference
@staticmethod
def from_reference(
ref: TDestinationReferenceArg,
credentials: Optional[Any] = None,
destination_name: Optional[str] = None,
environment: Optional[str] = None,
**kwargs: Any
) -> Optional["Destination[DestinationClientConfiguration, JobClientBase]"]
Instantiate destination from str reference.
The ref can be a destination name or import path pointing to a destination class (e.g. dlt.destinations.postgres
)