Skip to main content
Version: devel

common.schema.utils

is_valid_schema_name

def is_valid_schema_name(name: str) -> bool

[view_source]

Schema name must be a valid python identifier and have max len of 64

is_nested_table

def is_nested_table(table: TTableSchema) -> bool

[view_source]

Checks if table is a dlt nested table: connected to parent table via row_key - parent_key reference

normalize_schema_name

def normalize_schema_name(name: str) -> str

[view_source]

Normalizes schema name by using snake case naming convention. The maximum length is 64 characters

apply_defaults

def apply_defaults(stored_schema: TStoredSchema) -> TStoredSchema

[view_source]

Applies default hint values to stored_schema in place

Updates only complete column hints, incomplete columns are preserved intact

remove_defaults

def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema

[view_source]

Removes default values from stored_schema in place, returns the input for chaining

  • removes column and table names from the value
  • removed resource name if same as table name

has_default_column_prop_value

def has_default_column_prop_value(prop: str, value: Any) -> bool

[view_source]

Checks if value is a default for prop.

remove_column_defaults

def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema

[view_source]

Removes default values from column_schema in place, returns the input for chaining

bump_version_if_modified

def bump_version_if_modified(
stored_schema: TStoredSchema) -> Tuple[int, str, str, Sequence[str]]

[view_source]

Bumps the stored_schema version and version hash if content modified, returns (new version, new hash, old hash, 10 last hashes) tuple

normalize_simple_regex_column

def normalize_simple_regex_column(naming: NamingConvention,
regex: TSimpleRegex) -> TSimpleRegex

[view_source]

Assumes that regex applies to column name and normalizes it.

compile_simple_regexes

def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern

[view_source]

Compile multiple patterns as one

is_complete_column

def is_complete_column(col: TColumnSchemaBase) -> bool

[view_source]

Returns true if column contains enough data to be created at the destination. Must contain a name and a data type. Other hints have defaults.

is_nullable_column

def is_nullable_column(col: TColumnSchemaBase) -> bool

[view_source]

Returns true if column is nullable

find_incomplete_columns

def find_incomplete_columns(
table: TTableSchema) -> Iterable[Tuple[TColumnSchemaBase, bool]]

[view_source]

Yields (column, nullable) for all incomplete columns in table

compare_complete_columns

def compare_complete_columns(a: TColumnSchema, b: TColumnSchema) -> bool

[view_source]

Compares mandatory fields of complete columns

diff_table_references

def diff_table_references(
a: Sequence[TTableReference],
b: Sequence[TTableReference]) -> List[TTableReference]

[view_source]

Return a list of references containing references matched by table:

  • References from b that are not in a
  • References from b that are different from the one in a

merge_column

def merge_column(col_a: TColumnSchema,
col_b: TColumnSchema,
merge_defaults: bool = True) -> TColumnSchema

[view_source]

Merges col_b into col_a. if merge_defaults is True, only hints from col_b that are not default in col_a will be set.

Modifies col_a in place and returns it

merge_columns

def merge_columns(columns_a: TTableSchemaColumns,
columns_b: TTableSchemaColumns,
merge_columns: bool = False,
columns_partial: bool = True) -> TTableSchemaColumns

[view_source]

Merges columns_a with columns_b. columns_a is modified in place.

  • new columns are added
  • if merge_columns is False, updated columns are replaced from columns_b
  • if merge_columns is True, updated columns are merged with merge_column
  • if columns_partial is True, both columns sets are considered incomplete. In that case hints like primary_key or merge_key are merged
  • if columns_partial is False, hints like primary_key and merge_key are dropped from columns_a and replaced from columns_b
  • incomplete columns in columns_a that got completed in columns_b are removed to preserve order

diff_table

def diff_table(schema_name: str, tab_a: TTableSchema,
tab_b: TPartialTableSchema) -> TPartialTableSchema

[view_source]

Creates a partial table that contains properties found in tab_b that are not present or different in tab_a. The name is always present in returned partial. It returns new columns (not present in tab_a) and merges columns from tab_b into tab_a (overriding non-default hint values). If any columns are returned they contain full data (not diffs of columns)

Raises SchemaException if tables cannot be merged

  • when columns with the same name have different data types
  • when table links to different parent tables

ensure_compatible_tables

def ensure_compatible_tables(schema_name: str,
tab_a: TTableSchema,
tab_b: TPartialTableSchema,
ensure_columns: bool = True) -> None

[view_source]

Ensures that tab_a and tab_b can be merged without conflicts. Conflicts are detected when

  • tables have different names
  • nested tables have different parents
  • tables have any column with incompatible types

Note: all the identifiers must be already normalized

merge_table

def merge_table(schema_name: str, table: TTableSchema,
partial_table: TPartialTableSchema) -> TPartialTableSchema

[view_source]

Merges "partial_table" into "table". table is merged in place. Returns the diff partial table. table and partial_table names must be identical. A table diff is generated and applied to table

merge_diff

def merge_diff(table: TTableSchema,
table_diff: TPartialTableSchema) -> TPartialTableSchema

[view_source]

Merges a table diff table_diff into table. table is merged in place. Returns the diff.

  • new columns are added, updated columns are replaced from diff
  • incomplete columns in table that got completed in partial_table are removed to preserve order
  • table hints are added or replaced from diff
  • nothing gets deleted

normalize_table_identifiers

def normalize_table_identifiers(table: TTableSchema,
naming: NamingConvention) -> TTableSchema

[view_source]

Normalizes all table and column names in table schema according to current schema naming convention and returns new instance with modified table schema.

Naming convention like snake_case may produce name collisions with the column names. Colliding column schemas are merged where the column that is defined later in the dictionary overrides earlier column.

Note that resource name is not normalized.

has_table_seen_data

def has_table_seen_data(table: TTableSchema) -> bool

[view_source]

Checks if normalizer has seen data coming to the table.

remove_processing_hints

def remove_processing_hints(tables: TSchemaTables) -> TSchemaTables

[view_source]

Removes processing hints like x-normalizer and x-loader from schema tables. Modifies the input tables and returns it for convenience

get_processing_hints

def get_processing_hints(tables: TSchemaTables) -> Dict[str, List[str]]

[view_source]

Finds processing hints in a set of tables and returns table_name: [hints] mapping

get_first_column_name_with_prop

def get_first_column_name_with_prop(
table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> Optional[str]

[view_source]

Returns name of first column in table schema with property column_prop or None if no such column exists.

has_column_with_prop

def has_column_with_prop(table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> bool

[view_source]

Checks if table schema contains column with property column_prop.

get_dedup_sort_tuple

def get_dedup_sort_tuple(
table: TTableSchema,
include_incomplete: bool = False) -> Optional[Tuple[str, TSortOrder]]

[view_source]

Returns tuple with dedup sort information.

First element is the sort column name, second element is the sort order.

Returns None if "dedup_sort" hint was not provided.

get_write_disposition

def get_write_disposition(tables: TSchemaTables,
table_name: str) -> TWriteDisposition

[view_source]

Returns table hint of a table if present. If not, looks up into parent table

fill_hints_from_parent_and_clone_table

def fill_hints_from_parent_and_clone_table(
tables: TSchemaTables, table: TTableSchema) -> TTableSchema

[view_source]

Takes write disposition and table format from parent tables if not present

table_schema_has_type

def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool

[view_source]

Checks if table schema contains column with type _typ

table_schema_has_type_with_precision

def table_schema_has_type_with_precision(table: TTableSchema,
_typ: TDataType) -> bool

[view_source]

Checks if table schema contains column with type _typ and precision set

get_root_table

def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema

[view_source]

Finds root (without parent) of a table_name following the nested references (row_key - parent_key).

get_nested_tables

def get_nested_tables(tables: TSchemaTables,
table_name: str) -> List[TTableSchema]

[view_source]

Get nested tables for table name and return a list of tables ordered by ancestry so the nested tables are always after their parents

Note that this function follows only NESTED TABLE reference typically expressed on _dlt_parent_id (PARENT_KEY) to _dlt_id (ROW_KEY).

group_tables_by_resource

def group_tables_by_resource(
tables: TSchemaTables,
pattern: Optional[REPattern] = None) -> Dict[str, List[TTableSchema]]

[view_source]

Create a dict of resources and their associated tables and descendant tables If pattern is supplied, the result is filtered to only resource names matching the pattern.

dlt_id_column

def dlt_id_column() -> TColumnSchema

[view_source]

Definition of dlt id column

dlt_load_id_column

def dlt_load_id_column() -> TColumnSchema

[view_source]

Definition of dlt load id column

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.