common.schema.utils
is_valid_schema_name
def is_valid_schema_name(name: str) -> bool
Schema name must be a valid python identifier and have max len of 64
is_nested_table
def is_nested_table(table: TTableSchema) -> bool
Checks if table is a dlt nested table: connected to parent table via row_key - parent_key reference
normalize_schema_name
def normalize_schema_name(name: str) -> str
Normalizes schema name by using snake case naming convention. The maximum length is 64 characters
apply_defaults
def apply_defaults(stored_schema: TStoredSchema) -> TStoredSchema
Applies default hint values to stored_schema
in place
Updates only complete column hints, incomplete columns are preserved intact
remove_defaults
def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema
Removes default values from stored_schema
in place, returns the input for chaining
- removes column and table names from the value
- removed resource name if same as table name
has_default_column_prop_value
def has_default_column_prop_value(prop: str, value: Any) -> bool
Checks if value
is a default for prop
.
remove_column_defaults
def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema
Removes default values from column_schema
in place, returns the input for chaining
bump_version_if_modified
def bump_version_if_modified(
stored_schema: TStoredSchema) -> Tuple[int, str, str, Sequence[str]]
Bumps the stored_schema
version and version hash if content modified, returns (new version, new hash, old hash, 10 last hashes) tuple
normalize_simple_regex_column
def normalize_simple_regex_column(naming: NamingConvention,
regex: TSimpleRegex) -> TSimpleRegex
Assumes that regex applies to column name and normalizes it.
compile_simple_regexes
def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern
Compile multiple patterns as one
is_complete_column
def is_complete_column(col: TColumnSchemaBase) -> bool
Returns true if column contains enough data to be created at the destination. Must contain a name and a data type. Other hints have defaults.
is_nullable_column
def is_nullable_column(col: TColumnSchemaBase) -> bool
Returns true if column is nullable
find_incomplete_columns
def find_incomplete_columns(
table: TTableSchema) -> Iterable[Tuple[TColumnSchemaBase, bool]]
Yields (column, nullable) for all incomplete columns in table
compare_complete_columns
def compare_complete_columns(a: TColumnSchema, b: TColumnSchema) -> bool
Compares mandatory fields of complete columns
diff_table_references
def diff_table_references(
a: Sequence[TTableReference],
b: Sequence[TTableReference]) -> List[TTableReference]
Return a list of references containing references matched by table:
- References from
b
that are not ina
- References from
b
that are different from the one ina
merge_column
def merge_column(col_a: TColumnSchema,
col_b: TColumnSchema,
merge_defaults: bool = True) -> TColumnSchema
Merges col_b
into col_a
. if merge_defaults
is True, only hints from col_b
that are not default in col_a
will be set.
Modifies col_a in place and returns it
merge_columns
def merge_columns(columns_a: TTableSchemaColumns,
columns_b: TTableSchemaColumns,
merge_columns: bool = False,
columns_partial: bool = True) -> TTableSchemaColumns
Merges columns_a
with columns_b
. columns_a
is modified in place.
- new columns are added
- if
merge_columns
is False, updated columns are replaced fromcolumns_b
- if
merge_columns
is True, updated columns are merged withmerge_column
- if
columns_partial
is True, both columns sets are considered incomplete. In that case hints likeprimary_key
ormerge_key
are merged - if
columns_partial
is False, hints likeprimary_key
andmerge_key
are dropped fromcolumns_a
and replaced fromcolumns_b
- incomplete columns in
columns_a
that got completed incolumns_b
are removed to preserve order
diff_table
def diff_table(schema_name: str, tab_a: TTableSchema,
tab_b: TPartialTableSchema) -> TPartialTableSchema
Creates a partial table that contains properties found in tab_b
that are not present or different in tab_a
.
The name is always present in returned partial.
It returns new columns (not present in tab_a) and merges columns from tab_b into tab_a (overriding non-default hint values).
If any columns are returned they contain full data (not diffs of columns)
Raises SchemaException if tables cannot be merged
- when columns with the same name have different data types
- when table links to different parent tables
ensure_compatible_tables
def ensure_compatible_tables(schema_name: str,
tab_a: TTableSchema,
tab_b: TPartialTableSchema,
ensure_columns: bool = True) -> None
Ensures that tab_a
and tab_b
can be merged without conflicts. Conflicts are detected when
- tables have different names
- nested tables have different parents
- tables have any column with incompatible types
Note: all the identifiers must be already normalized
merge_table
def merge_table(schema_name: str, table: TTableSchema,
partial_table: TPartialTableSchema) -> TPartialTableSchema
Merges "partial_table" into "table". table
is merged in place. Returns the diff partial table.
table
and partial_table
names must be identical. A table diff is generated and applied to table
merge_diff
def merge_diff(table: TTableSchema,
table_diff: TPartialTableSchema) -> TPartialTableSchema
Merges a table diff table_diff
into table
. table
is merged in place. Returns the diff.
- new columns are added, updated columns are replaced from diff
- incomplete columns in
table
that got completed inpartial_table
are removed to preserve order - table hints are added or replaced from diff
- nothing gets deleted
normalize_table_identifiers
def normalize_table_identifiers(table: TTableSchema,
naming: NamingConvention) -> TTableSchema
Normalizes all table and column names in table
schema according to current schema naming convention and returns
new instance with modified table schema.
Naming convention like snake_case may produce name collisions with the column names. Colliding column schemas are merged where the column that is defined later in the dictionary overrides earlier column.
Note that resource name is not normalized.
has_table_seen_data
def has_table_seen_data(table: TTableSchema) -> bool
Checks if normalizer has seen data coming to the table.
remove_processing_hints
def remove_processing_hints(tables: TSchemaTables) -> TSchemaTables
Removes processing hints like x-normalizer and x-loader from schema tables. Modifies the input tables and returns it for convenience
get_processing_hints
def get_processing_hints(tables: TSchemaTables) -> Dict[str, List[str]]
Finds processing hints in a set of tables and returns table_name: [hints] mapping
get_first_column_name_with_prop
def get_first_column_name_with_prop(
table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> Optional[str]
Returns name of first column in table
schema with property column_prop
or None if no such column exists.
has_column_with_prop
def has_column_with_prop(table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> bool
Checks if table
schema contains column with property column_prop
.
get_dedup_sort_tuple
def get_dedup_sort_tuple(
table: TTableSchema,
include_incomplete: bool = False) -> Optional[Tuple[str, TSortOrder]]
Returns tuple with dedup sort information.
First element is the sort column name, second element is the sort order.
Returns None if "dedup_sort" hint was not provided.
get_write_disposition
def get_write_disposition(tables: TSchemaTables,
table_name: str) -> TWriteDisposition
Returns table hint of a table if present. If not, looks up into parent table
fill_hints_from_parent_and_clone_table
def fill_hints_from_parent_and_clone_table(
tables: TSchemaTables, table: TTableSchema) -> TTableSchema
Takes write disposition and table format from parent tables if not present
table_schema_has_type
def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool
Checks if table
schema contains column with type _typ
table_schema_has_type_with_precision
def table_schema_has_type_with_precision(table: TTableSchema,
_typ: TDataType) -> bool
Checks if table
schema contains column with type _typ and precision set
get_root_table
def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema
Finds root (without parent) of a table_name
following the nested references (row_key - parent_key).
get_nested_tables
def get_nested_tables(tables: TSchemaTables,
table_name: str) -> List[TTableSchema]
Get nested tables for table name and return a list of tables ordered by ancestry so the nested tables are always after their parents
Note that this function follows only NESTED TABLE reference typically expressed on _dlt_parent_id (PARENT_KEY) to _dlt_id (ROW_KEY).
group_tables_by_resource
def group_tables_by_resource(
tables: TSchemaTables,
pattern: Optional[REPattern] = None) -> Dict[str, List[TTableSchema]]
Create a dict of resources and their associated tables and descendant tables
If pattern
is supplied, the result is filtered to only resource names matching the pattern.
dlt_id_column
def dlt_id_column() -> TColumnSchema
Definition of dlt id column
dlt_load_id_column
def dlt_load_id_column() -> TColumnSchema
Definition of dlt load id column