Skip to main content
Version: devel

common.storages.load_package

TJobFileFormat

Loader file formats with internal job types

TPipelineStateDoc Objects

class TPipelineStateDoc(TypedDict)

[view_source]

Corresponds to the StateInfo Tuple

TLoadPackageDropTablesState Objects

class TLoadPackageDropTablesState(TypedDict)

[view_source]

dropped_tables

List of tables that are to be dropped from the schema and destination (i.e. when refresh mode is used)

truncated_tables

List of tables that are to be truncated in the destination (i.e. when refresh='drop_data' mode is used)

TLoadPackageState Objects

class TLoadPackageState(TVersionedState, TLoadPackageDropTablesState)

[view_source]

created_at

Timestamp when the load package was created

pipeline_state

Pipeline state, added at the end of the extraction phase

destination_state

private space for destinations to store state relevant only to the load package

TLoadPackage Objects

class TLoadPackage(TypedDict)

[view_source]

load_id

Load id

state

State of the load package

create_load_id

def create_load_id() -> str

[view_source]

Creates new package load id which is the current unix timestamp converted to string. Load ids must have the following properties:

  • They must maintain increase order over time for a particular dlt schema loaded to particular destination and dataset dlt executes packages in order of load ids dlt considers a state with the highest load id to be the most up to date when restoring state from destination

ParsedLoadJobFileName Objects

class ParsedLoadJobFileName(NamedTuple)

[view_source]

Represents a file name of a job in load package. The file name contains name of a table, number of times the job was retried, extension and a 5 bytes random string to make job file name unique. The job id does not contain retry count and is immutable during loading of the data

job_id

def job_id() -> str

[view_source]

Unique identifier of the job

file_name

def file_name() -> str

[view_source]

A name of the file with the data to be loaded

with_retry

def with_retry() -> "ParsedLoadJobFileName"

[view_source]

Returns a job with increased retry count

PackageStorage Objects

class PackageStorage()

[view_source]

APPLIED_SCHEMA_UPDATES_FILE_NAME

updates applied to the destination

__init__

def __init__(storage: FileStorage, initial_state: TLoadPackageStatus) -> None

[view_source]

Creates storage that manages load packages with root at storage and initial package state initial_state

get_package_path

def get_package_path(load_id: str) -> str

[view_source]

Gets path of the package relative to storage root

get_job_state_folder_path

def get_job_state_folder_path(load_id: str, state: TPackageJobState) -> str

[view_source]

Gets path to the jobs in state in package load_id, relative to the storage root

get_job_file_path

def get_job_file_path(load_id: str, state: TPackageJobState,
file_name: str) -> str

[view_source]

Get path to job with file_name in state in package load_id, relative to the storage root

list_packages

def list_packages() -> Sequence[str]

[view_source]

Lists all load ids in storage, earliest first

NOTE: Load ids are sorted alphabetically. This class does not store package creation time separately.

list_failed_jobs_infos

def list_failed_jobs_infos(load_id: str) -> Sequence[LoadJobInfo]

[view_source]

List all failed jobs and associated error messages for a load package with load_id

import_job

def import_job(load_id: str,
job_file_path: str,
job_state: TPackageJobState = "new_jobs") -> None

[view_source]

Adds new job by moving the job_file_path into new_jobs of package load_id

complete_loading_package

def complete_loading_package(load_id: str,
load_state: TLoadPackageStatus) -> str

[view_source]

Completes loading the package by writing marker file with`package_state. Returns path to the completed package

remove_completed_jobs

def remove_completed_jobs(load_id: str) -> None

[view_source]

Deletes completed jobs. If package has failed jobs, nothing gets deleted.

schema_name

def schema_name(load_id: str) -> str

[view_source]

Gets schema name associated with the package

get_load_package_jobs

def get_load_package_jobs(
load_id: str) -> Dict[TPackageJobState, List[ParsedLoadJobFileName]]

[view_source]

Gets all jobs in a package and returns them as lists assigned to a particular state.

get_load_package_info

def get_load_package_info(load_id: str) -> LoadPackageInfo

[view_source]

Gets information on normalized/completed package with given load_id, all jobs and their statuses.

Will reach to the file system to get additional stats, mtime, also collects exceptions for failed jobs. NOTE: do not call this function often. it should be used only to generate metrics

get_job_failed_message

def get_job_failed_message(load_id: str, job: ParsedLoadJobFileName) -> str

[view_source]

Get exception message of a failed job.

job_to_job_info

def job_to_job_info(load_id: str, state: TPackageJobState,
job: ParsedLoadJobFileName) -> LoadJobInfo

[view_source]

Creates partial job info by converting job object. size, mtime and failed message will not be populated

is_package_partially_loaded

@staticmethod
def is_package_partially_loaded(package_info: LoadPackageInfo) -> bool

[view_source]

Checks if package is partially loaded - has jobs that are completed and jobs that are not.

load_package

def load_package() -> TLoadPackage

[view_source]

Get full load package state present in current context. Across all threads this will be the same in memory dict.

commit_load_package_state

def commit_load_package_state() -> None

[view_source]

Commit load package state present in current context. This is thread safe.

destination_state

def destination_state() -> DictStrAny

[view_source]

Get segment of load package state that is specific to the current destination.

clear_destination_state

def clear_destination_state(commit: bool = True) -> None

[view_source]

Clear segment of load package state that is specific to the current destination. Optionally commit to load package.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.