dlt.common.schema.utils
is_valid_schema_name
def is_valid_schema_name(name: str) -> bool
Schema name must be a valid python identifier and have max len of 64
is_nested_table
def is_nested_table(table: TTableSchema) -> bool
Checks if table is a dlt nested table: connected to parent table via row_key - parent_key reference
may_be_nested
def may_be_nested(table: TTableSchema) -> bool
Table may be nested if it does not define any primary/merge keys
normalize_schema_name
def normalize_schema_name(name: str) -> str
Normalizes schema name by using snake case naming convention. The maximum length is 64 characters
apply_defaults
def apply_defaults(stored_schema: TStoredSchema) -> TStoredSchema
Applies default hint values to stored_schema in place
Updates only complete column hints, incomplete columns are preserved intact
remove_defaults
def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema
Removes default values from stored_schema in place, returns the input for chaining
- removes column and table names from the value
- removed resource name if same as table name
has_default_column_prop_value
def has_default_column_prop_value(prop: str, value: Any) -> bool
Checks if value is a default for prop.
is_compound_prop
def is_compound_prop(prop: str) -> bool
Checks if a column property is compound.
remove_compound_props
def remove_compound_props(columns: TTableSchemaColumns,
compound_props: set[str]) -> TTableSchemaColumns
Removes compound properties from all columns in place.
Arguments:
columns- Table columns to modify.compound_props- Set of property names to remove.
Returns:
The modified columns dict (same object that was passed in).
Note- This is a generic property remover, but the name reflects its intended use. It removes properties even if their value is False.
remove_column_defaults
def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema
Removes default values from column_schema in place, returns the input for chaining
bump_version_if_modified
def bump_version_if_modified(
stored_schema: TStoredSchema) -> Tuple[int, str, str, List[str]]
Bumps the stored_schema version and version hash if content modified, returns (new version, new hash, old hash, 10 last hashes) tuple
normalize_simple_regex_column
def normalize_simple_regex_column(naming: NamingConvention,
regex: TSimpleRegex) -> TSimpleRegex
Assumes that regex applies to column name and normalizes it.
compile_simple_regexes
def compile_simple_regexes(r: Iterable[TSimpleRegex]) -> REPattern
Compile multiple patterns as one
is_complete_column
def is_complete_column(col: TColumnSchemaBase) -> bool
Returns true if column contains enough data to be created at the destination. Must contain a name and a data type. Other hints have defaults.
is_nullable_column
def is_nullable_column(col: TColumnSchemaBase) -> bool
Returns true if column is nullable
find_incomplete_columns
def find_incomplete_columns(
table: TTableSchema) -> Iterable[Tuple[TColumnSchemaBase, bool]]
Yields (column, nullable) for all incomplete columns in table
compare_complete_columns
def compare_complete_columns(a: TColumnSchema, b: TColumnSchema) -> bool
Compares mandatory fields of complete columns
diff_table_references
def diff_table_references(
a: Sequence[TTableReference],
b: Sequence[TTableReference]) -> List[TTableReference]
Return a list of references containing references matched by table:
- References from
bthat are not ina - References from
bthat are different from the one ina
merge_column
def merge_column(col_a: TColumnSchema,
col_b: TColumnSchema,
merge_defaults: bool = True) -> TColumnSchema
Merges properties from col_b into col_a, modifying col_a in place.
All properties from col_b are copied into col_a, potentially overwriting existing values.
Arguments:
col_a- Target column schema that will be modifiedcol_b- Source column schema with properties to merge inmerge_defaults- If False, removes properties with default values fromcol_bbefore merging. This prevents unnecessary default values from being explicitly set incol_a.
Returns:
The modified col_a (same object that was passed in)
merge_columns
def merge_columns(columns_a: TTableSchemaColumns,
columns_b: TTableSchemaColumns,
merge_compound_props: bool = True) -> TTableSchemaColumns
Merges columns from columns_b into columns_a, modifying columns_a in place.
For each column in columns_b:
- If column doesn't exist in
columns_a, it's added - If column exists in
columns_a, properties fromcolumns_bare merged into it
Arguments:
columns_a- Target columns dict that will be modifiedcolumns_b- Source columns dict with columns/properties to merge inmerge_compound_props- If set to True, compound properties fromcolumns_bare merged tocolumns_a, If False, compound properties like primary_key and merge_key are replaced entirely rather than merged, socolumns_b's non-default values fully overridecolumns_a's.
Returns:
The modified columns_a (same object that was passed in)
NOTE- Incomplete columns incolumns_athat become complete incolumns_bare removed and re-added to preserve order.
diff_table
def diff_table(schema_name: str,
tab_a: TTableSchema,
tab_b: TPartialTableSchema,
additive_compound_props: bool = True) -> TPartialTableSchema
Computes the difference between tab_a and tab_b, returning what's new or changed in tab_b.
The returned partial table contains:
- New columns from
tab_bthat don't exist intab_a - Modified columns that exist in both but have different properties in
tab_b - Changed table properties (write_disposition, resource, etc.)
Arguments:
schema_name- Name of the schema for error messagestab_a- Original table schema to compare againsttab_b- New/updated table schema with potential changesadditive_compound_props- Controls how the diff handles compound properties:- True: Compound properties from
tab_bare additions totab_a. Only new compound property assignments are included in the diff. - False: Compound properties in
tab_brepresent the complete/authoritative set. All compound property assignments fromtab_bare included in the diff, even if they already exist intab_awith the same values.
- True: Compound properties from
Returns:
Partial table schema containing only what's new or changed in tab_b.
Columns in the result are complete column schemas, not property-level diffs.
Raises:
-
SchemaException- When tables are incompatible (different names, different parents, or columns with incompatible data types). -
NOTE- This function does not validate data type compatibility. It computes differences permissively. Validation happens later during normalization viaensure_compatible_tableswith ensure_columns=True.
ensure_compatible_tables
def ensure_compatible_tables(schema_name: str,
tab_a: TTableSchema,
tab_b: TPartialTableSchema,
ensure_columns: bool = True) -> None
Ensures that tab_a and tab_b can be merged without conflicts. Conflicts are detected when
- tables have different names
- nested tables have different parents
- tables have any column with incompatible types
Note: all the identifiers must be already normalized
merge_table
def merge_table(schema_name: str,
table: TTableSchema,
partial_table: TPartialTableSchema,
merge_compound_props: bool = True) -> TPartialTableSchema
Merges partial_table into table in place. Returns the diff partial table.
table and partial_table names must be identical. A table diff is generated and applied to table.
Arguments:
merge_compound_props- If False, compound properties (seeis_compound_prop()) in partial_table replace rather than merge with those in table.
merge_diff
def merge_diff(table: TTableSchema,
table_diff: TPartialTableSchema,
merge_compound_props: bool = True) -> TPartialTableSchema
Merges a table diff table_diff into table in place. Returns the diff.
- new columns are added, updated columns are replaced from diff
- incomplete columns in
tablethat got completed inpartial_tableare removed to preserve order - table hints are added or replaced from diff
- nothing gets deleted
Arguments:
merge_compound_props- If False, compound properties (seeis_compound_prop()) in partial_table replace rather than merge with those in table.
normalize_table_identifiers
def normalize_table_identifiers(table: TTableSchema,
naming: NamingConvention) -> TTableSchema
Normalizes all table and column names in table schema according to current schema naming convention and returns
new instance with modified table schema.
Naming convention like snake_case may produce name collisions with the column names. Colliding column schemas are merged where the column that is defined later in the dictionary overrides earlier column.
Note that resource name is not normalized.
has_table_seen_data
def has_table_seen_data(table: TTableSchema) -> bool
Checks if normalizer has seen data coming to the table.
remove_processing_hints
def remove_processing_hints(tables: TSchemaTables) -> TSchemaTables
Removes processing hints like x-normalizer and x-loader from schema tables and columns. Modifies the input tables and returns it for convenience.
has_seen_null_first_hint
def has_seen_null_first_hint(column_schema: TColumnSchema) -> bool
Checks if column_schema has seen seen-null-first hint set to True in the x-normalizer hints.
remove_seen_null_first_hint
def remove_seen_null_first_hint(column_schema: TColumnSchema) -> TColumnSchema
Removes seen-null-first hint from the x-normalizer hints in column_schema in place,
if the x-normalizer section becomes empty after removing the hint, it is also removed, returns the modified input
get_processing_hints
def get_processing_hints(
tables: TSchemaTables
) -> Tuple[Dict[str, List[str]], Dict[str, Dict[str, List[str]]]]
Finds processing hints from schema tables and columns.
Returns:
A tuple containing:
- A dictionary mapping table names to a list of table-level processing hints (e.g., 'x-normalizer', 'x-loader').
- A dictionary mapping table names to another dictionary that maps column names to a list of column-level processing hints (e.g., 'x-normalizer').
get_first_column_name_with_prop
def get_first_column_name_with_prop(
table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> Optional[str]
Returns name of first column in table schema with property column_prop or None if no such column exists.
has_column_with_prop
def has_column_with_prop(table: TTableSchema,
column_prop: Union[TColumnProp, str],
include_incomplete: bool = False) -> bool
Checks if table schema contains column with property column_prop.
get_dedup_sort_tuple
def get_dedup_sort_tuple(
table: TTableSchema,
include_incomplete: bool = False) -> Optional[Tuple[str, TSortOrder]]
Returns tuple with dedup sort information.
First element is the sort column name, second element is the sort order.
Returns None if "dedup_sort" hint was not provided.
get_write_disposition
def get_write_disposition(tables: TSchemaTables,
table_name: str) -> TWriteDisposition
Returns table hint of a table if present. If not, looks up into parent table
fill_hints_from_parent_and_clone_table
def fill_hints_from_parent_and_clone_table(
tables: TSchemaTables, table: TTableSchema) -> TTableSchema
Takes write disposition and table format from parent tables if not present
table_schema_has_type
def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool
Checks if table schema contains column with type _typ
table_schema_has_type_with_precision
def table_schema_has_type_with_precision(table: TTableSchema,
_typ: TDataType) -> bool
Checks if table schema contains column with type _typ and precision set
get_data_and_dlt_tables
def get_data_and_dlt_tables(
tables: TSchemaTables
) -> tuple[list[TTableSchema], list[TTableSchema]]
Separate a list of dlt TTableSchema into a two lists: data tables and internal dlt tables.
This should be equivalent to dlt.Schema.data_tables() and dlt.Schema.dlt_tables()
get_dlt_prefix_by_naming_convetion
def get_dlt_prefix_by_naming_convetion(naming: NamingConvention) -> str
The dlt prefix, used for tables and columns,normalized according to the naming convention
is_dlt_table_or_column
def is_dlt_table_or_column(name: str, normalized_dlt_prefix: str) -> bool
Check if a table or column name is a dlt internal name by checking if it starts with dlt prefix.
Arguments:
name- The table or column name to checknormalized_dlt_prefix- The dlt prefix to check against, normalized by the naming convention
remove_dlt_columns_from_table
def remove_dlt_columns_from_table(
table_schema: TTableSchema,
normalized_dlt_prefix: str,
exclude_dlt_columns: bool = True) -> TTableSchema
Remove dlt columns from a single table schema.
Arguments:
table_schema- The table schema to filternormalized_dlt_prefix- The dlt prefix to filter by, normalized by the naming conventionexclude_dlt_columns- If True, remove columns whose name starts with the given prefix
Returns:
A new table schema with dlt columns optionally filtered out
exclude_dlt_entities
def exclude_dlt_entities(
table_schemas: Iterable[TTableSchema],
normalized_dlt_prefix: str,
exclude_dlt_tables: bool = True,
exclude_dlt_columns: bool = True) -> List[TTableSchema]
Filter out dlt tables and/or dlt columns from a collection of table schemas.
Arguments:
table_schemas- An iterable of table schemas to filternormalized_dlt_prefix- The normalized name of the prefix used to denote internal dlt columns and tables, according to the used naming conventionexclude_dlt_tables- If True, remove tables whose name starts with the given prefixexclude_dlt_columns- If True, remove columns whose name starts with the given prefix
Returns:
List of filtered table schemas.
Note- dlt supports changing the default prefix, see schema._dlt_tables_prefix attribute to get the source of truth for your schema
get_root_table
def get_root_table(tables: TSchemaTables, table_name: str) -> TTableSchema
Finds root (without parent) of a table_name following the nested references (row_key - parent_key).
get_nested_tables
def get_nested_tables(
tables: TSchemaTables,
table_name: str,
max_nesting: Optional[int] = None,
include_self: Optional[bool] = True) -> List[TTableSchema]
Get nested tables for table name and return a list of tables ordered by ancestry so the nested tables are always after their parents
Note that this function follows only NESTED TABLE reference typically expressed on _dlt_parent_id (PARENT_KEY) to _dlt_id (ROW_KEY).
Arguments:
tablesTSchemaTables - A mapping of table names to their table schema definitions. This is used to look up the root table and to recursively find its nested child tables by following their "parent" references.table_namestr - The name of the root table from which to collect nested tables.max_nestingOptional[int] - If specified, limits the depth of nesting. 0 = only the root table, 1 = root + direct children, etc.include_selfOptional[bool] - If False, the root table itself is excluded from the returned list.
Returns:
List[TTableSchema]- A list of nested tables.
group_tables_by_resource
def group_tables_by_resource(
tables: TSchemaTables,
pattern: Optional[REPattern] = None) -> Dict[str, List[TTableSchema]]
Create a dict of resources and their associated tables and descendant tables
If pattern is supplied, the result is filtered to only resource names matching the pattern.
create_root_child_reference
def create_root_child_reference(tables: TSchemaTables,
table_name: str) -> TTableReference
Create a Reference between {table}.{root_key} and {root}.{row_key}
create_parent_child_reference
def create_parent_child_reference(tables: TSchemaTables,
table_name: str) -> TTableReference
Create a Reference between {table}.{parent_key} and {parent}.{row_key}
create_load_table_reference
def create_load_table_reference(
table: TTableSchema,
*,
naming: NamingConvention = None) -> TTableReference
Create a Reference between {table}._dlt_load_id and _dlt_loads.load_id
dlt_id_column
def dlt_id_column() -> TColumnSchema
Definition of dlt id column
dlt_load_id_column
def dlt_load_id_column() -> TColumnSchema
Definition of dlt load id column