Skip to main content
Version: devel

dlt.common.schema.schema

Schema Objects

class Schema()

View source on GitHub

naming

Naming convention used by the schema to normalize identifiers

data_item_normalizer

Data item normalizer used by the schema to create tables

version_table_name

Normalized name of the version table

loads_table_name

Normalized name of the loads table

state_table_name

Normalized name of the dlt state table

replace_schema_content

def replace_schema_content(schema: "Schema",
link_to_replaced_schema: bool = False) -> None

View source on GitHub

Replaces content of the current schema with schema content. Does not compute new schema hash and does not increase the numeric version. Optionally will link the replaced schema to incoming schema by keeping its hash in prev hashes and setting stored hash to replaced schema hash.

apply_schema_contract

def apply_schema_contract(
schema_contract: TSchemaContractDict,
partial_table: TPartialTableSchema,
data_item: TDataItem = None,
raise_on_freeze: bool = True
) -> Tuple[TPartialTableSchema, List[Tuple[TSchemaContractEntities, str,
TSchemaEvolutionMode]]]

View source on GitHub

Checks if schema_contract allows for the partial_table to update the schema. It applies the contract dropping the affected columns or the whole partial_table. It generates and returns a set of filters that should be applied to incoming data in order to modify it so it conforms to the contract. data_item is provided only as evidence in case DataValidationError is raised.

Example schema_contract: { "tables": "freeze", "columns": "evolve", "data_type": "discard_row" }

Settings for table affects new tables, settings for column affects new columns and settings for data_type affects new variant columns. Each setting can be set to one of:

  • evolve: allow all changes
  • freeze: allow no change and fail the load
  • discard_row: allow no schema change and filter out the row
  • discard_value: allow no schema change and filter out the value but load the rest of the row

Returns a tuple where a first element is modified partial table and the second is a list of filters. The modified partial may be None in case the whole table is not allowed. Each filter is a tuple of (table|columns, entity name, freeze | discard_row | discard_value). Note: by default freeze immediately raises DataValidationError which is convenient in most use cases

expand_schema_contract_settings

@staticmethod
def expand_schema_contract_settings(
settings: TSchemaContract,
default: TSchemaContractDict = None) -> TSchemaContractDict

View source on GitHub

Expand partial or shorthand settings into full settings dictionary using default for unset entities

resolve_contract_settings_for_table

def resolve_contract_settings_for_table(
table_name: str,
new_table_schema: TTableSchema = None) -> TSchemaContractDict

View source on GitHub

Resolve the exact applicable schema contract settings for the table table_name. new_table_schema is added to the tree during the resolution.

update_table

def update_table(partial_table: TPartialTableSchema,
normalize_identifiers: bool = True,
from_diff: bool = False) -> TPartialTableSchema

View source on GitHub

Adds or merges partial_table into the schema. Identifiers are normalized by default. if from_diff is True, then partial_table is assumed to be a diff (contains only differences) vs. table in schema. in that case diff will not be created but directly applied

update_schema

def update_schema(schema: "Schema") -> None

View source on GitHub

Updates this schema from an incoming schema. Normalizes identifiers after updating normalizers.

drop_tables

def drop_tables(table_names: Sequence[str]) -> List[TTableSchema]

View source on GitHub

Drops tables from the schema and returns the dropped tables. List of table names must contain all nested tables to tables being dropped.

merge_hints

def merge_hints(new_hints: Mapping[TColumnDefaultHint, Sequence[TSimpleRegex]],
replace: bool = False,
normalize_identifiers: bool = True) -> None

View source on GitHub

Merges or replace existing default hints with new_hints. Normalizes names in column regexes if possible. Compiles setting at the end

NOTE: you can manipulate default hints collection directly via Schema.settings as long as you call Schema._compile_settings() at the end.

update_preferred_types

def update_preferred_types(new_preferred_types: Mapping[TSimpleRegex,
TDataType],
normalize_identifiers: bool = True) -> None

View source on GitHub

Updates preferred types dictionary with new_preferred_types. Normalizes names in column regexes if possible. Compiles setting at the end

NOTE: you can manipulate preferred hints collection directly via Schema.settings as long as you call Schema._compile_settings() at the end.

add_type_detection

def add_type_detection(detection: TTypeDetections) -> None

View source on GitHub

Add type auto detection to the schema.

remove_type_detection

def remove_type_detection(detection: TTypeDetections) -> None

View source on GitHub

Adds type auto detection to the schema.

get_new_table_columns

def get_new_table_columns(
table_name: str,
existing_columns: TTableSchemaColumns,
case_sensitive: bool,
include_incomplete: bool = False) -> List[TColumnSchema]

View source on GitHub

Gets new columns to be added to existing_columns to bring them up to date with table_name schema. Columns names are compared case sensitive by default. existing_column names are expected to be normalized. Typically they come from the destination schema. Columns that are in existing_columns and not in table_name columns are ignored.

Optionally includes incomplete columns (without data type)

get_table_columns

def get_table_columns(table_name: str,
include_incomplete: bool = False) -> TTableSchemaColumns

View source on GitHub

Gets columns of table_name. Optionally includes incomplete columns

data_tables

def data_tables(seen_data_only: bool = False,
include_incomplete: bool = False) -> List[TTableSchema]

View source on GitHub

Gets list of all tables, that hold the loaded data. Excludes dlt tables. Excludes incomplete tables (ie. without columns)

data_table_names

def data_table_names(seen_data_only: bool = False,
include_incomplete: bool = False) -> List[str]

View source on GitHub

Returns list of table names. Excludes dlt table names.

dlt_tables

def dlt_tables() -> List[TTableSchema]

View source on GitHub

Gets dlt tables

dlt_table_names

def dlt_table_names() -> List[str]

View source on GitHub

Returns list of dlt table names.

is_new_table

def is_new_table(table_name: str) -> bool

View source on GitHub

Returns true if this table does not exist OR is incomplete (has only incomplete columns) and therefore new

version

@property
def version() -> int

View source on GitHub

Version of the schema content that takes into account changes from the time of schema loading/creation. The stored version is increased by one if content was modified

Returns:

  • int - Current schema version

stored_version

@property
def stored_version() -> int

View source on GitHub

Version of the schema content form the time of schema loading/creation.

Returns:

  • int - Stored schema version

version_hash

@property
def version_hash() -> str

View source on GitHub

Current version hash of the schema, recomputed from the actual content

previous_hashes

@property
def previous_hashes() -> Sequence[str]

View source on GitHub

Current version hash of the schema, recomputed from the actual content

stored_version_hash

@property
def stored_version_hash() -> str

View source on GitHub

Version hash of the schema content form the time of schema loading/creation.

is_modified

@property
def is_modified() -> bool

View source on GitHub

Checks if schema was modified from the time it was saved or if this is a new schema

A current version hash is computed and compared with stored version hash

is_new

@property
def is_new() -> bool

View source on GitHub

Checks if schema was ever saved

tables

@property
def tables() -> TSchemaTables

View source on GitHub

Dictionary of schema tables

references

@property
def references() -> list[TTableReferenceStandalone]

View source on GitHub

References between tables

_repr_html_

def _repr_html_(**kwargs: Any) -> str

View source on GitHub

Render the Schema has a graphviz graph and display it using HTML

This method is automatically called by notebooks renderers (IPython, marimo, etc.) ref: https://ipython.readthedocs.io/en/stable/config/integrating.html

dlt.helpers.graphviz.render_with_html() has not external Python or system dependencies.

to_dot

def to_dot(remove_processing_hints: bool = False,
include_dlt_tables: bool = True,
include_internal_dlt_ref: bool = True,
include_parent_child_ref: bool = True,
include_root_child_ref: bool = True,
group_by_resource: bool = False) -> str

View source on GitHub

Convert schema to a Graphviz DOT string.

Arguments:

  • remove_processing_hints - If True, remove hints used for data processing and redundant information. This reduces the size of the schema and improves readability.
  • include_dlt_tables - If True, include data tables and internal dlt tables. This will influence table references and groups produced.
  • include_internal_dlt_ref - If True, include references between tables _dlt_version, _dlt_loads and _dlt_pipeline_state
  • include_parent_child_ref - If True, include references from child._dlt_parent_id to parent._dlt_id
  • include_root_child_ref - If True, include references from child._dlt_root_id to root._dlt_id
  • group_by_resource - If True, group tables by resource and create subclusters.

Returns:

A DOT string of the schema

clone

def clone(with_name: str = None,
remove_processing_hints: bool = False,
update_normalizers: bool = False) -> "Schema"

View source on GitHub

Make a deep copy of the schema, optionally changing the name, removing processing markers and updating normalizers and identifiers in the schema if update_normalizers is True Processing markers are x- hints created by normalizer (x-normalizer) and loader (x-loader) to ie. mark newly inferred tables and tables that seen data. Note that changing of name will break the previous version chain

update_normalizers

def update_normalizers() -> None

View source on GitHub

Looks for new normalizer configuration or for destination capabilities context and updates all identifiers in the schema

Table and column names will be normalized with new naming convention, except tables that have seen data ('x-normalizer`) which will raise if any identifier is to be changed. Default hints, preferred data types and normalize configs (ie. column propagation) are normalized as well. Regexes are included as long as textual parts can be extracted from an expression.

will_update_normalizers

def will_update_normalizers() -> bool

View source on GitHub

Checks if schema has any pending normalizer updates due to configuration or destination capabilities

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.