dlt.destinations.impl.filesystem.filesystem
FilesystemLoadJob Objects
class FilesystemLoadJob(RunnableLoadJob)
make_remote_file_path
def make_remote_file_path(file_name: str) -> str
Returns path on remote filesystem to move file to, without scheme and dataset.
make_remote_path
def make_remote_path() -> str
Returns path on remote filesystem to move file(s) to, without scheme, but with dataset.
For local filesystem a native path is used.
make_remote_url
def make_remote_url() -> str
Returns path on a remote filesystem as a full url including scheme.
HfFilesystemUploadJob Objects
class HfFilesystemUploadJob(HasFollowupJobs, FilesystemLoadJob)
Pre-uploads a single file to HF LFS storage without creating a commit.
File content is uploaded to HF's content-addressed storage via preupload_lfs_files. The actual git commit is created later by HfFilesystemCommitJob once all uploads in the table chain complete.
HfFilesystemCommitJob Objects
class HfFilesystemCommitJob(ReferenceFollowupJob)
Commits pre-uploaded files to a HF dataset repo.
Files are already in HF's content-addressed storage (uploaded by HfFilesystemUploadJob). create_commit internally detects pre-uploaded blobs and only creates the git commit. Retries on 409/412 commit conflicts with exponential backoff via tenacity. Other errors propagate to dlt load engine for dlt-level retry.
make_path_in_repo
def make_path_in_repo(file_path: str) -> str
Returns path relative to repo root, without namespace and repo name.
FilesystemClient Objects
class FilesystemClient(FSClientBase, WithSqlClient, JobClientBase,
WithStagingDataset, WithStateSync, SupportsOpenTables)
batch_table_chain
If True, uses single reference followup job for entire table chain. If False, uses separate reference followup job for each table in chain.
storage_versions
@property
def storage_versions() -> Tuple[int, int]
Returns cached storage versions, loading it once from filesystem if not already cached
init_file_path
@property
def init_file_path() -> str
Returns the path to the init file for the current dataset
create_dataset
def create_dataset() -> None
Creates empty dataset directory.
drop_dataset
def drop_dataset() -> None
Removes dataset directory with all its content.
dataset_path
@property
def dataset_path() -> str
A path within a bucket to tables in a dataset NOTE: dataset_name changes if with_staging_dataset is active
migrate_storage
def migrate_storage(from_version: int, to_version: int) -> None
Migrate storage from one version to another
get_storage_versions
def get_storage_versions() -> Tuple[int, int]
Returns initial and current storage versions.
- If the init file is empty, we assume legacy version 1 where .gz extension was not added to compressed files.
- For any other non-empty content we parse it as json, expect version key to have a supported value.
get_storage_tables
def get_storage_tables(
table_names: Iterable[str]
) -> Iterable[Tuple[str, TTableSchemaColumns]]
Yields tables that have files in storage, returns columns from current schema
truncate_tables
def truncate_tables(table_names: List[str]) -> None
Truncate a set of regular tables with given table_names
get_table_dir
def get_table_dir(table_name: str, remote: bool = False) -> str
Returns a directory containing table files, ending with separator. Note that many tables can share the same table dir
get_table_prefix
def get_table_prefix(table_name: str) -> str
For table prefixes that are folders, trailing separator will be preserved
get_table_dirs
def get_table_dirs(table_names: Iterable[str],
remote: bool = False) -> List[str]
Gets directories where table data is stored.
list_table_files
def list_table_files(table_name: str) -> List[str]
gets list of files associated with one table
list_files_with_prefixes
def list_files_with_prefixes(table_dir: str, prefixes: List[str]) -> List[str]
returns all files in a directory that match given prefixes
make_remote_url
def make_remote_url(remote_path: str) -> str
Returns uri to the remote filesystem to which copy the file
get_stored_schema
def get_stored_schema(schema_name: str = None) -> Optional[StorageSchemaInfo]
Retrieves newest schema from destination storage
load_open_table
def load_open_table(table_format: TTableFormat, table_name: str,
**kwargs: Any) -> Any
Locates, loads and returns native table client for table table_name in delta or iceberg formats
get_open_table_catalog
def get_open_table_catalog(table_format: TTableFormat,
catalog_name: Optional[str] = None) -> Any
Gets a native catalog for a table with format table_format
Returns: currently pyiceberg Catalog is supported
get_open_table_location
def get_open_table_location(table_format: TTableFormat,
table_name: str) -> str
All tables have location, also those in "native" table format. Native format in case of filesystem is a set of parquet/csv/jsonl files where a table may be placed in a separate folder or share common prefix define in the layout. Locations of native tables will are normalized to include trailing separator if path is a "folder" (includes buckets) Note: location is fully formed url
HfFilesystemClient Objects
class HfFilesystemClient(FilesystemClient)
truncate_tables
def truncate_tables(table_names: List[str]) -> None
Truncates tables in a single HF commit. Note: tables won't be present in storage but dlt schemas are not reset
drop_tables
def drop_tables(*tables: str, delete_schema: bool = True) -> None
Drop tables and optionally schema files in a single HF commit.
to_path_in_repo
def to_path_in_repo(path: str) -> str
Strips dataset path prefix to return path relative to repo root.
list_table_files_in_repo
def list_table_files_in_repo(table_name: str) -> List[str]
Lists files for table_name with paths relative to repo root.