Skip to main content
Version: devel

dlt.common.schema.utils

is_valid_schema_name

def is_valid_schema_name(name: str) -> bool

View source on GitHub

Schema name must be a valid python identifier and have max len of 64

is_nested_table

def is_nested_table(table: TTableSchema) -> bool

View source on GitHub

Checks if table is a dlt nested table: connected to parent table via row_key - parent_key reference

may_be_nested

def may_be_nested(table: TTableSchema) -> bool

View source on GitHub

Table may be nested if it does not define any primary/merge keys

normalize_schema_name

def normalize_schema_name(name: str) -> str

View source on GitHub

Normalizes schema name by using snake case naming convention. The maximum length is 64 characters

apply_defaults

def apply_defaults(stored_schema: TStoredSchema) -> TStoredSchema

View source on GitHub

Applies default hint values to stored_schema in place

Updates only complete column hints, incomplete columns are preserved intact

remove_defaults

def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema

View source on GitHub

Removes default values from stored_schema in place, returns the input for chaining

  • removes column and table names from the value
  • removed resource name if same as table name

has_default_column_prop_value

def has_default_column_prop_value(prop: str, value: Any) -> bool

View source on GitHub

Checks if value is a default for prop.

is_compound_prop

def is_compound_prop(prop: str) -> bool

View source on GitHub

Checks if a column property is compound.

remove_compound_props

def remove_compound_props(columns: TTableSchemaColumns,
compound_props: set[str]) -> TTableSchemaColumns

View source on GitHub

Removes compound properties from all columns in place.

Arguments:

  • columns - Table columns to modify.
  • compound_props - Set of property names to remove.

Returns:

The modified columns dict (same object that was passed in).

  • Note - This is a generic property remover, but the name reflects its intended use. It removes properties even if their value is False.

remove_column_defaults

def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema

View source on GitHub

Removes default values from column_schema in place, returns the input for chaining

bump_version_if_modified

def bump_version_if_modified(
stored_schema: TStoredSchema) -> Tuple[int, str, str, List[str]]

View source on GitHub

Bumps the stored_schema version and version hash if content modified, returns (new version, new hash, old hash, 10 last hashes) tuple

normalize_simple_regex_column

def normalize_simple_regex_column(naming: NamingConvention,
regex: TSimpleRegex) -> TSimpleRegex

View source on GitHub

Assumes that regex applies to column name and normalizes it.

compile_simple_regexes

def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern

View source on GitHub

Compile multiple patterns as one

is_complete_column

def is_complete_column(col: TColumnSchemaBase) -> bool

View source on GitHub

Returns true if column contains enough data to be created at the destination. Must contain a name and a data type. Other hints have defaults.

is_nullable_column

def is_nullable_column(col: TColumnSchemaBase) -> bool

View source on GitHub

Returns true if column is nullable

find_incomplete_columns

def find_incomplete_columns(
table: TTableSchema) -> Iterable[Tuple[TColumnSchemaBase, bool]]

View source on GitHub

Yields (column, nullable) for all incomplete columns in table

compare_complete_columns

def compare_complete_columns(a: TColumnSchema, b: TColumnSchema) -> bool

View source on GitHub

Compares mandatory fields of complete columns

diff_table_references

def diff_table_references(
a: Sequence[TTableReference],
b: Sequence[TTableReference]) -> List[TTableReference]

View source on GitHub

Return a list of references containing references matched by table:

  • References from b that are not in a
  • References from b that are different from the one in a

merge_column

def merge_column(col_a: TColumnSchema,
col_b: TColumnSchema,
merge_defaults: bool = True) -> TColumnSchema

View source on GitHub

Merges properties from col_b into col_a, modifying col_a in place.

All properties from col_b are copied into col_a, potentially overwriting existing values.

Arguments:

  • col_a - Target column schema that will be modified
  • col_b - Source column schema with properties to merge in
  • merge_defaults - If False, removes properties with default values from col_b before merging. This prevents unnecessary default values from being explicitly set in col_a.

Returns:

The modified col_a (same object that was passed in)

merge_columns

def merge_columns(columns_a: TTableSchemaColumns,
columns_b: TTableSchemaColumns,
merge_compound_props: bool = True) -> TTableSchemaColumns

View source on GitHub

Merges columns from columns_b into columns_a, modifying columns_a in place.

For each column in columns_b:

  • If column doesn't exist in columns_a, it's added
  • If column exists in columns_a, properties from columns_b are merged into it

Arguments:

  • columns_a - Target columns dict that will be modified
  • columns_b - Source columns dict with columns/properties to merge in
  • merge_compound_props - If set to True, compound properties from columns_b are merged to columns_a, If False, compound properties like primary_key and merge_key are replaced entirely rather than merged, so columns_b's non-default values fully override columns_a's.

Returns:

The modified columns_a (same object that was passed in)

  • NOTE - Incomplete columns in columns_a that become complete in columns_b are removed and re-added to preserve order.

diff_table

def diff_table(schema_name: str,
tab_a: TTableSchema,
tab_b: TPartialTableSchema,
additive_compound_props: bool = True) -> TPartialTableSchema

View source on GitHub

Computes the difference between tab_a and tab_b, returning what's new or changed in tab_b.

The returned partial table contains:

  • New columns from tab_b that don't exist in tab_a
  • Modified columns that exist in both but have different properties in tab_b
  • Changed table properties (write_disposition, resource, etc.)

Arguments:

  • schema_name - Name of the schema for error messages
  • tab_a - Original table schema to compare against
  • tab_b - New/updated table schema with potential changes
  • additive_compound_props - Controls how the diff handles compound properties:
    • True: Compound properties from tab_b are additions to tab_a. Only new compound property assignments are included in the diff.
    • False: Compound properties in tab_b represent the complete/authoritative set. All compound property assignments from tab_b are included in the diff, even if they already exist in tab_a with the same values.

Returns:

Partial table schema containing only what's new or changed in tab_b. Columns in the result are complete column schemas, not property-level diffs.

Raises:

  • SchemaException - When tables are incompatible (different names, different parents, or columns with incompatible data types).

  • NOTE - This function does not validate data type compatibility. It computes differences permissively. Validation happens later during normalization via ensure_compatible_tables with ensure_columns=True.

ensure_compatible_tables

def ensure_compatible_tables(schema_name: str,
tab_a: TTableSchema,
tab_b: TPartialTableSchema,
ensure_columns: bool = True) -> None

View source on GitHub

Ensures that tab_a and tab_b can be merged without conflicts. Conflicts are detected when

  • tables have different names
  • nested tables have different parents
  • tables have any column with incompatible types

Note: all the identifiers must be already normalized

merge_table

def merge_table(schema_name: str,
table: TTableSchema,
partial_table: TPartialTableSchema,
merge_compound_props: bool = True) -> TPartialTableSchema

View source on GitHub

Merges partial_table into table in place. Returns the diff partial table.

table and partial_table names must be identical. A table diff is generated and applied to table.

Arguments:

  • merge_compound_props - If False, compound properties (see is_compound_prop()) in partial_table replace rather than merge with those in table.

merge_diff

def merge_diff(table: TTableSchema,
table_diff: TPartialTableSchema,
merge_compound_props: bool = True) -> TPartialTableSchema

View source on GitHub

Merges a table diff table_diff into table in place. Returns the diff.

  • new columns are added, updated columns are replaced from diff
  • incomplete columns in table that got completed in partial_table are removed to preserve order
  • table hints are added or replaced from diff
  • nothing gets deleted

Arguments:

  • merge_compound_props - If False, compound properties (see is_compound_prop()) in partial_table replace rather than merge with those in table.

normalize_table_identifiers

def normalize_table_identifiers(table: TTableSchema,
naming: NamingConvention) -> TTableSchema

View source on GitHub

Normalizes all table and column names in table schema according to current schema naming convention and returns new instance with modified table schema.

Naming convention like snake_case may produce name collisions with the column names. Colliding column schemas are merged where the column that is defined later in the dictionary overrides earlier column.

Note that resource name is not normalized.

has_table_seen_data

def has_table_seen_data(table: TTableSchema) -> bool

View source on GitHub

Checks if normalizer has seen data coming to the table.

remove_processing_hints

def remove_processing_hints(tables: TSchemaTables) -> TSchemaTables

View source on GitHub

Removes processing hints like x-normalizer and x-loader from schema tables and columns. Modifies the input tables and returns it for convenience.

has_seen_null_first_hint

def has_seen_null_first_hint(column_schema: TColumnSchema) -> bool

View source on GitHub

Checks if column_schema has seen seen-null-first hint set to True in the x-normalizer hints.

remove_seen_null_first_hint

def remove_seen_null_first_hint(column_schema: TColumnSchema) -> TColumnSchema

View source on GitHub

Removes seen-null-first hint from the x-normalizer hints in column_schema in place, if the x-normalizer section becomes empty after removing the hint, it is also removed, returns the modified input

get_processing_hints

def get_processing_hints(
tables: TSchemaTables
) -> Tuple[Dict[str, List[str]], Dict[str, Dict[str, List[str]]]]

View source on GitHub

Finds processing hints from schema tables and columns.

Returns:

A tuple containing:

  • A dictionary mapping table names to a list of table-level processing hints (e.g., 'x-normalizer', 'x-loader').
  • A dictionary mapping table names to another dictionary that maps column names to a list of column-level processing hints (e.g., 'x-normalizer').

get_first_column_name_with_prop

def get_first_column_name_with_prop(
table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> Optional[str]

View source on GitHub

Returns name of first column in table schema with property column_prop or None if no such column exists.

has_column_with_prop

def has_column_with_prop(table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> bool

View source on GitHub

Checks if table schema contains column with property column_prop.

get_dedup_sort_tuple

def get_dedup_sort_tuple(
table: TTableSchema,
include_incomplete: bool = False) -> Optional[Tuple[str, TSortOrder]]

View source on GitHub

Returns tuple with dedup sort information.

First element is the sort column name, second element is the sort order.

Returns None if "dedup_sort" hint was not provided.

get_write_disposition

def get_write_disposition(tables: TSchemaTables,
table_name: str) -> TWriteDisposition

View source on GitHub

Returns table hint of a table if present. If not, looks up into parent table

fill_hints_from_parent_and_clone_table

def fill_hints_from_parent_and_clone_table(
tables: TSchemaTables, table: TTableSchema) -> TTableSchema

View source on GitHub

Takes write disposition and table format from parent tables if not present

table_schema_has_type

def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool

View source on GitHub

Checks if table schema contains column with type _typ

table_schema_has_type_with_precision

def table_schema_has_type_with_precision(table: TTableSchema,
_typ: TDataType) -> bool

View source on GitHub

Checks if table schema contains column with type _typ and precision set

get_data_and_dlt_tables

def get_data_and_dlt_tables(
tables: TSchemaTables
) -> tuple[list[TTableSchema], list[TTableSchema]]

View source on GitHub

Separate a list of dlt TTableSchema into a two lists: data tables and internal dlt tables.

This should be equivalent to dlt.Schema.data_tables() and dlt.Schema.dlt_tables()

get_dlt_prefix_by_naming_convetion

def get_dlt_prefix_by_naming_convetion(naming: NamingConvention) -> str

View source on GitHub

The dlt prefix, used for tables and columns,normalized according to the naming convention

is_dlt_table_or_column

def is_dlt_table_or_column(name: str, normalized_dlt_prefix: str) -> bool

View source on GitHub

Check if a table or column name is a dlt internal name by checking if it starts with dlt prefix.

Arguments:

  • name - The table or column name to check
  • normalized_dlt_prefix - The dlt prefix to check against, normalized by the naming convention

remove_dlt_columns_from_table

def remove_dlt_columns_from_table(
table_schema: TTableSchema,
normalized_dlt_prefix: str,
exclude_dlt_columns: bool = True) -> TTableSchema

View source on GitHub

Remove dlt columns from a single table schema.

Arguments:

  • table_schema - The table schema to filter
  • normalized_dlt_prefix - The dlt prefix to filter by, normalized by the naming convention
  • exclude_dlt_columns - If True, remove columns whose name starts with the given prefix

Returns:

A new table schema with dlt columns optionally filtered out

exclude_dlt_entities

def exclude_dlt_entities(
table_schemas: Iterable[TTableSchema],
normalized_dlt_prefix: str,
exclude_dlt_tables: bool = True,
exclude_dlt_columns: bool = True) -> List[TTableSchema]

View source on GitHub

Filter out dlt tables and/or dlt columns from a collection of table schemas.

Arguments:

  • table_schemas - An iterable of table schemas to filter
  • normalized_dlt_prefix - The normalized name of the prefix used to denote internal dlt columns and tables, according to the used naming convention
  • exclude_dlt_tables - If True, remove tables whose name starts with the given prefix
  • exclude_dlt_columns - If True, remove columns whose name starts with the given prefix

Returns:

List of filtered table schemas.

  • Note - dlt supports changing the default prefix, see schema._dlt_tables_prefix attribute to get the source of truth for your schema

get_root_table

def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema

View source on GitHub

Finds root (without parent) of a table_name following the nested references (row_key - parent_key).

get_nested_tables

def get_nested_tables(
tables: TSchemaTables,
table_name: str,
max_nesting: Optional[int] = None,
include_self: Optional[bool] = True) -> List[TTableSchema]

View source on GitHub

Get nested tables for table name and return a list of tables ordered by ancestry so the nested tables are always after their parents

Note that this function follows only NESTED TABLE reference typically expressed on _dlt_parent_id (PARENT_KEY) to _dlt_id (ROW_KEY).

Arguments:

  • tables TSchemaTables - A mapping of table names to their table schema definitions. This is used to look up the root table and to recursively find its nested child tables by following their "parent" references.
  • table_name str - The name of the root table from which to collect nested tables.
  • max_nesting Optional[int] - If specified, limits the depth of nesting. 0 = only the root table, 1 = root + direct children, etc.
  • include_self Optional[bool] - If False, the root table itself is excluded from the returned list.

Returns:

  • List[TTableSchema] - A list of nested tables.

group_tables_by_resource

def group_tables_by_resource(
tables: TSchemaTables,
pattern: Optional[REPattern] = None) -> Dict[str, List[TTableSchema]]

View source on GitHub

Create a dict of resources and their associated tables and descendant tables If pattern is supplied, the result is filtered to only resource names matching the pattern.

create_root_child_reference

def create_root_child_reference(tables: TSchemaTables,
table_name: str) -> TTableReference

View source on GitHub

Create a Reference between {table}.{root_key} and {root}.{row_key}

create_parent_child_reference

def create_parent_child_reference(tables: TSchemaTables,
table_name: str) -> TTableReference

View source on GitHub

Create a Reference between {table}.{parent_key} and {parent}.{row_key}

create_load_table_reference

def create_load_table_reference(
table: TTableSchema,
*,
naming: NamingConvention = None) -> TTableReference

View source on GitHub

Create a Reference between {table}._dlt_load_id and _dlt_loads.load_id

dlt_id_column

def dlt_id_column() -> TColumnSchema

View source on GitHub

Definition of dlt id column

dlt_load_id_column

def dlt_load_id_column() -> TColumnSchema

View source on GitHub

Definition of dlt load id column

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.