common.storages.load_package
TJobFileFormat
Loader file formats with internal job types
TPipelineStateDoc Objects
class TPipelineStateDoc(TypedDict)
Corresponds to the StateInfo Tuple
TLoadPackageDropTablesState Objects
class TLoadPackageDropTablesState(TypedDict)
dropped_tables
List of tables that are to be dropped from the schema and destination (i.e. when refresh
mode is used)
truncated_tables
List of tables that are to be truncated in the destination (i.e. when refresh='drop_data'
mode is used)
TLoadPackageState Objects
class TLoadPackageState(TVersionedState, TLoadPackageDropTablesState)
created_at
Timestamp when the load package was created
pipeline_state
Pipeline state, added at the end of the extraction phase
destination_state
private space for destinations to store state relevant only to the load package
TLoadPackage Objects
class TLoadPackage(TypedDict)
load_id
Load id
state
State of the load package
create_load_id
def create_load_id() -> str
Creates new package load id which is the current unix timestamp converted to string. Load ids must have the following properties:
- They must maintain increase order over time for a particular dlt schema loaded to particular destination and dataset
dlt
executes packages in order of load idsdlt
considers a state with the highest load id to be the most up to date when restoring state from destination
ParsedLoadJobFileName Objects
class ParsedLoadJobFileName(NamedTuple)
Represents a file name of a job in load package. The file name contains name of a table, number of times the job was retried, extension and a 5 bytes random string to make job file name unique. The job id does not contain retry count and is immutable during loading of the data
job_id
def job_id() -> str
Unique identifier of the job
file_name
def file_name() -> str
A name of the file with the data to be loaded
with_retry
def with_retry() -> "ParsedLoadJobFileName"
Returns a job with increased retry count
PackageStorage Objects
class PackageStorage()
APPLIED_SCHEMA_UPDATES_FILE_NAME
updates applied to the destination
__init__
def __init__(storage: FileStorage, initial_state: TLoadPackageStatus) -> None
Creates storage that manages load packages with root at storage
and initial package state initial_state
get_package_path
def get_package_path(load_id: str) -> str
Gets path of the package relative to storage root
get_job_state_folder_path
def get_job_state_folder_path(load_id: str, state: TPackageJobState) -> str
Gets path to the jobs in state
in package load_id
, relative to the storage root
get_job_file_path
def get_job_file_path(load_id: str, state: TPackageJobState,
file_name: str) -> str
Get path to job with file_name
in state
in package load_id
, relative to the storage root
list_packages
def list_packages() -> Sequence[str]
Lists all load ids in storage, earliest first
NOTE: Load ids are sorted alphabetically. This class does not store package creation time separately.
list_failed_jobs_infos
def list_failed_jobs_infos(load_id: str) -> Sequence[LoadJobInfo]
List all failed jobs and associated error messages for a load package with load_id
import_job
def import_job(load_id: str,
job_file_path: str,
job_state: TPackageJobState = "new_jobs") -> None
Adds new job by moving the job_file_path
into new_jobs
of package load_id
complete_loading_package
def complete_loading_package(load_id: str,
load_state: TLoadPackageStatus) -> str
Completes loading the package by writing marker file with`package_state. Returns path to the completed package
remove_completed_jobs
def remove_completed_jobs(load_id: str) -> None
Deletes completed jobs. If package has failed jobs, nothing gets deleted.
schema_name
def schema_name(load_id: str) -> str
Gets schema name associated with the package
get_load_package_jobs
def get_load_package_jobs(
load_id: str) -> Dict[TPackageJobState, List[ParsedLoadJobFileName]]
Gets all jobs in a package and returns them as lists assigned to a particular state.
get_load_package_info
def get_load_package_info(load_id: str) -> LoadPackageInfo
Gets information on normalized/completed package with given load_id, all jobs and their statuses.
Will reach to the file system to get additional stats, mtime, also collects exceptions for failed jobs. NOTE: do not call this function often. it should be used only to generate metrics
get_job_failed_message
def get_job_failed_message(load_id: str, job: ParsedLoadJobFileName) -> str
Get exception message of a failed job.
job_to_job_info
def job_to_job_info(load_id: str, state: TPackageJobState,
job: ParsedLoadJobFileName) -> LoadJobInfo
Creates partial job info by converting job object. size, mtime and failed message will not be populated
is_package_partially_loaded
@staticmethod
def is_package_partially_loaded(package_info: LoadPackageInfo) -> bool
Checks if package is partially loaded - has jobs that are completed and jobs that are not.
load_package
def load_package() -> TLoadPackage
Get full load package state present in current context. Across all threads this will be the same in memory dict.
commit_load_package_state
def commit_load_package_state() -> None
Commit load package state present in current context. This is thread safe.
destination_state
def destination_state() -> DictStrAny
Get segment of load package state that is specific to the current destination.
clear_destination_state
def clear_destination_state(commit: bool = True) -> None
Clear segment of load package state that is specific to the current destination. Optionally commit to load package.