Skip to main content
Version: devel

dlt.common.schema.utils

is_valid_schema_name

def is_valid_schema_name(name: str) -> bool

View source on GitHub

Schema name must be a valid python identifier and have max len of 64

is_nested_table

def is_nested_table(table: TTableSchema) -> bool

View source on GitHub

Checks if table is a dlt nested table: connected to parent table via row_key - parent_key reference

may_be_nested

def may_be_nested(table: TTableSchema) -> bool

View source on GitHub

Table may be nested if it does not define any primary/merge keys

normalize_schema_name

def normalize_schema_name(name: str) -> str

View source on GitHub

Normalizes schema name by using snake case naming convention. The maximum length is 64 characters

apply_defaults

def apply_defaults(stored_schema: TStoredSchema) -> TStoredSchema

View source on GitHub

Applies default hint values to stored_schema in place

Updates only complete column hints, incomplete columns are preserved intact

remove_defaults

def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema

View source on GitHub

Removes default values from stored_schema in place, returns the input for chaining

  • removes column and table names from the value
  • removed resource name if same as table name

has_default_column_prop_value

def has_default_column_prop_value(prop: str, value: Any) -> bool

View source on GitHub

Checks if value is a default for prop.

remove_column_defaults

def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema

View source on GitHub

Removes default values from column_schema in place, returns the input for chaining

bump_version_if_modified

def bump_version_if_modified(
stored_schema: TStoredSchema) -> Tuple[int, str, str, List[str]]

View source on GitHub

Bumps the stored_schema version and version hash if content modified, returns (new version, new hash, old hash, 10 last hashes) tuple

normalize_simple_regex_column

def normalize_simple_regex_column(naming: NamingConvention,
regex: TSimpleRegex) -> TSimpleRegex

View source on GitHub

Assumes that regex applies to column name and normalizes it.

compile_simple_regexes

def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern

View source on GitHub

Compile multiple patterns as one

is_complete_column

def is_complete_column(col: TColumnSchemaBase) -> bool

View source on GitHub

Returns true if column contains enough data to be created at the destination. Must contain a name and a data type. Other hints have defaults.

is_nullable_column

def is_nullable_column(col: TColumnSchemaBase) -> bool

View source on GitHub

Returns true if column is nullable

find_incomplete_columns

def find_incomplete_columns(
table: TTableSchema) -> Iterable[Tuple[TColumnSchemaBase, bool]]

View source on GitHub

Yields (column, nullable) for all incomplete columns in table

compare_complete_columns

def compare_complete_columns(a: TColumnSchema, b: TColumnSchema) -> bool

View source on GitHub

Compares mandatory fields of complete columns

diff_table_references

def diff_table_references(
a: Sequence[TTableReference],
b: Sequence[TTableReference]) -> List[TTableReference]

View source on GitHub

Return a list of references containing references matched by table:

  • References from b that are not in a
  • References from b that are different from the one in a

merge_column

def merge_column(col_a: TColumnSchema,
col_b: TColumnSchema,
merge_defaults: bool = True) -> TColumnSchema

View source on GitHub

Merges col_b into col_a. if merge_defaults is True, only hints from col_b that are not default in col_a will be set.

Modifies col_a in place and returns it

merge_columns

def merge_columns(columns_a: TTableSchemaColumns,
columns_b: TTableSchemaColumns,
merge_columns: bool = False,
columns_partial: bool = True) -> TTableSchemaColumns

View source on GitHub

Merges columns_a with columns_b. columns_a is modified in place.

  • new columns are added
  • if merge_columns is False, updated columns are replaced from columns_b
  • if merge_columns is True, updated columns are merged with merge_column
  • if columns_partial is True, both columns sets are considered incomplete. In that case hints like primary_key or merge_key are merged
  • if columns_partial is False, hints like primary_key and merge_key are dropped from columns_a and replaced from columns_b
  • incomplete columns in columns_a that got completed in columns_b are removed to preserve order

diff_table

def diff_table(schema_name: str, tab_a: TTableSchema,
tab_b: TPartialTableSchema) -> TPartialTableSchema

View source on GitHub

Creates a partial table that contains properties found in tab_b that are not present or different in tab_a. The name is always present in returned partial. It returns new columns (not present in tab_a) and merges columns from tab_b into tab_a (overriding non-default hint values). If any columns are returned they contain full data (not diffs of columns)

Raises SchemaException if tables cannot be merged

  • when columns with the same name have different data types
  • when table links to different parent tables

ensure_compatible_tables

def ensure_compatible_tables(schema_name: str,
tab_a: TTableSchema,
tab_b: TPartialTableSchema,
ensure_columns: bool = True) -> None

View source on GitHub

Ensures that tab_a and tab_b can be merged without conflicts. Conflicts are detected when

  • tables have different names
  • nested tables have different parents
  • tables have any column with incompatible types

Note: all the identifiers must be already normalized

merge_table

def merge_table(schema_name: str, table: TTableSchema,
partial_table: TPartialTableSchema) -> TPartialTableSchema

View source on GitHub

Merges "partial_table" into "table". table is merged in place. Returns the diff partial table. table and partial_table names must be identical. A table diff is generated and applied to table

merge_diff

def merge_diff(table: TTableSchema,
table_diff: TPartialTableSchema) -> TPartialTableSchema

View source on GitHub

Merges a table diff table_diff into table. table is merged in place. Returns the diff.

  • new columns are added, updated columns are replaced from diff
  • incomplete columns in table that got completed in partial_table are removed to preserve order
  • table hints are added or replaced from diff
  • nothing gets deleted

normalize_table_identifiers

def normalize_table_identifiers(table: TTableSchema,
naming: NamingConvention) -> TTableSchema

View source on GitHub

Normalizes all table and column names in table schema according to current schema naming convention and returns new instance with modified table schema.

Naming convention like snake_case may produce name collisions with the column names. Colliding column schemas are merged where the column that is defined later in the dictionary overrides earlier column.

Note that resource name is not normalized.

has_table_seen_data

def has_table_seen_data(table: TTableSchema) -> bool

View source on GitHub

Checks if normalizer has seen data coming to the table.

remove_processing_hints

def remove_processing_hints(tables: TSchemaTables) -> TSchemaTables

View source on GitHub

Removes processing hints like x-normalizer and x-loader from schema tables. Modifies the input tables and returns it for convenience

get_processing_hints

def get_processing_hints(tables: TSchemaTables) -> Dict[str, List[str]]

View source on GitHub

Finds processing hints in a set of tables and returns table_name: [hints] mapping

get_first_column_name_with_prop

def get_first_column_name_with_prop(
table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> Optional[str]

View source on GitHub

Returns name of first column in table schema with property column_prop or None if no such column exists.

has_column_with_prop

def has_column_with_prop(table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> bool

View source on GitHub

Checks if table schema contains column with property column_prop.

get_dedup_sort_tuple

def get_dedup_sort_tuple(
table: TTableSchema,
include_incomplete: bool = False) -> Optional[Tuple[str, TSortOrder]]

View source on GitHub

Returns tuple with dedup sort information.

First element is the sort column name, second element is the sort order.

Returns None if "dedup_sort" hint was not provided.

get_write_disposition

def get_write_disposition(tables: TSchemaTables,
table_name: str) -> TWriteDisposition

View source on GitHub

Returns table hint of a table if present. If not, looks up into parent table

fill_hints_from_parent_and_clone_table

def fill_hints_from_parent_and_clone_table(
tables: TSchemaTables, table: TTableSchema) -> TTableSchema

View source on GitHub

Takes write disposition and table format from parent tables if not present

table_schema_has_type

def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool

View source on GitHub

Checks if table schema contains column with type _typ

table_schema_has_type_with_precision

def table_schema_has_type_with_precision(table: TTableSchema,
_typ: TDataType) -> bool

View source on GitHub

Checks if table schema contains column with type _typ and precision set

get_root_table

def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema

View source on GitHub

Finds root (without parent) of a table_name following the nested references (row_key - parent_key).

get_nested_tables

def get_nested_tables(tables: TSchemaTables,
table_name: str) -> List[TTableSchema]

View source on GitHub

Get nested tables for table name and return a list of tables ordered by ancestry so the nested tables are always after their parents

Note that this function follows only NESTED TABLE reference typically expressed on _dlt_parent_id (PARENT_KEY) to _dlt_id (ROW_KEY).

group_tables_by_resource

def group_tables_by_resource(
tables: TSchemaTables,
pattern: Optional[REPattern] = None) -> Dict[str, List[TTableSchema]]

View source on GitHub

Create a dict of resources and their associated tables and descendant tables If pattern is supplied, the result is filtered to only resource names matching the pattern.

dlt_id_column

def dlt_id_column() -> TColumnSchema

View source on GitHub

Definition of dlt id column

dlt_load_id_column

def dlt_load_id_column() -> TColumnSchema

View source on GitHub

Definition of dlt load id column

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.