dlt.extract.decorators
DltSourceFactoryWrapper Objects
class DltSourceFactoryWrapper(SourceFactory[TSourceFunParams, TDltSourceImpl])
__init__
def __init__() -> None
Creates a wrapper that is returned by @source decorator. It preserves the decorated function when called and
allows to change the decorator arguments at runtime. Changing the name and section creates a clone of the source
with different name and taking the configuration from a different keys.
This wrapper registers the source under section.name type in SourceReference registry, using the original
section (which corresponds to module name) and name (which corresponds to source function name).
clone
def clone(*,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = None,
schema: Schema = None,
schema_contract: TSchemaContract = None,
spec: Type[BaseConfiguration] = None,
parallelized: bool = None,
_impl_cls: Type[TDltSourceImpl] = None) -> Self
Overrides default arguments that will be used to create DltSource instance when this wrapper is called. This method clones this wrapper.
bind
def bind(f: AnyFun) -> Self
Binds wrapper to the original source function and registers the source reference. This method is called only once by the decorator
wrap
def wrap() -> SourceReference
Wrap the original source function using _deco.
source
def source(func: Optional[AnyFun] = None,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = None,
schema: Schema = None,
schema_contract: TSchemaContract = None,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
_impl_cls: Type[TDltSourceImpl] = DltSource) -> Any
A decorator that transforms a function returning one or more dlt resources into a dlt source in order to load it with dlt.
Notes:
A dlt source is a logical grouping of resources that are often extracted and loaded together. A source is associated with a schema, which describes the structure of the loaded data and provides instructions how to load it.
Such schema contains table schemas that describe the structure of the data coming from the resources.
Please refer to https://dlthub.com/docs/general-usage/source for a complete documentation.
Credentials: Another important function of the source decorator is to provide credentials and other configuration to the code that extracts data. The decorator may automatically bind the source function arguments to the secret and config values.
@dlt.source
def chess(username, chess_url: str = dlt.config.value, api_secret = dlt.secrets.value, title: str = "GM"):
return user_profile(username, chess_url, api_secret), user_games(username, chess_url, api_secret, with_titles=title)
list(chess("magnuscarlsen"))
Here username is a required, explicit python argument, chess_url is a required argument, that if not explicitly passed will be taken from configuration ie. config.toml, api_secret is a required argument, that if not explicitly passed will be taken from dlt secrets ie. secrets.toml.
See https://dlthub.com/docs/general-usage/credentials/ for details.
Arguments:
-
funcOptional[AnyFun] - A function that returns a dlt resource or a list of those or a list of any data items that can be loaded bydlt. -
namestr, optional - A name of the source which is also the name of the associated schema. If not present, the function name will be used. -
sectionstr, optional - Configuration section that comes right after 'sourcesin default layout. If not present, the current python module name will be used. Default layout issources.<section>.<name>.<key_name>`. -
max_table_nestingint, optional - A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON. -
root_keybool - Enables merging on all resources by propagating row key from root to all nested tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge. Defaults to False. -
schemaSchema, optional - An explicitSchemainstance to be associated with the source. If not present,dltcreates a newSchemaobject with providedname. If suchSchemaalready exists in the same folder as the module containing the decorated function, such schema will be loaded from file. -
schema_contractTSchemaContract, optional - Schema contract settings that will be applied to this resource. -
specType[BaseConfiguration], optional - A specification of configuration and secret values required by the source. -
parallelizedbool, optional - IfTrue, resource generators will be extracted in parallel with other resources. Transformers that return items are also parallelized. Non-eligible resources are ignored. Defaults toFalsewhich preserves resource settings. -
_impl_clsType[TDltSourceImpl], optional - A custom implementation of DltSource, may be also used to providing just a typing stub
Returns:
Any- Wrapped decorated source function, see SourceFactory reference for additional wrapper capabilities
resource
def resource(
data: Optional[Any] = None,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
max_table_nesting: int = None,
write_disposition: TTableHintTemplate[TWriteDispositionConfig] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
file_format: TTableHintTemplate[TFileFormat] = None,
references: TTableHintTemplate[TTableReferenceParam] = None,
nested_hints: Optional[TTableHintTemplate[Dict[
TTableNames, TResourceNestedHints]]] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
incremental: Optional[TIncrementalConfig] = None,
_impl_cls: Type[TDltResourceImpl] = DltResource,
section: Optional[TTableHintTemplate[str]] = None,
_base_spec: Type[BaseConfiguration] = BaseConfiguration,
standalone: bool = None,
data_from: TUnboundDltResource = None) -> Any
When used as a decorator, transforms any generator (yielding) function into a dlt resource. When used as a function, it transforms data in data argument into a dlt resource.
Notes:
A resourceis a location within a source that holds the data with specific structure (schema) or coming from specific origin. A resource may be a rest API endpoint, table in the database or a tab in Google Sheets.
A dlt resource is python representation of a resource that combines both data and metadata (table schema) that describes the structure and instructs the loading of the data.
A dlt resource is also an Iterable and can used like any other iterable object ie. list or tuple.
Please refer to https://dlthub.com/docs/general-usage/resource for a complete documentation.
Credentials:
If used as a decorator (data argument is a Generator), it may automatically bind the source function arguments to the secret and config values.
@dlt.resource
def user_games(username, chess_url: str = dlt.config.value, api_secret = dlt.secrets.value):
return requests.get("%s/games/%s" % (chess_url, username), headers={"Authorization": f"Bearer {api_secret}"})
list(user_games("magnuscarlsen"))
Here username is a required, explicit python argument, chess_url is a required argument, that if not explicitly passed will be taken from configuration ie. config.toml, api_secret is a required argument, that if not explicitly passed will be taken from dlt secrets ie. secrets.toml.
See https://dlthub.com/docs/general-usage/credentials/ for details.
Note that if decorated function is an inner function, passing of the credentials will be disabled.
Arguments:
-
dataOptional[Any], optional - a function to be decorated or a data compatible withdltrun. -
nameTTableHintTemplate[str], optional - A name of the resource that by default also becomes the name of the table to which the data is loaded. If not present, the name of the decorated function will be used. -
table_nameTTableHintTemplate[str], optional - An table name, if different fromname. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
max_table_nestingint, optional - A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON. -
write_dispositionTTableHintTemplate[TWriteDispositionConfig], optional - Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. Allowed shorthand string literals:appendwill always add new data at the end of the table.replacewill replace existing data with new data.skipwill prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table providewrite_disposition={"disposition": "merge", "strategy": "scd2"}. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
columnsTTableHintTemplate[TAnySchemaColumns], optional - A list, dict or pydantic model of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. When the argument is a pydantic model, the model will be used to validate the data yielded by the resource as well. -
primary_keyTTableHintTemplate[TColumnNames], optional - A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
merge_keyTTableHintTemplate[TColumnNames], optional - A column name or a list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to keep a single record for a given day. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
schema_contractTTableHintTemplate[TSchemaContract], optional - Schema contract settings that will be applied to all resources of this source (if not overridden in the resource itself) -
table_formatTTableHintTemplate[TTableFormat], optional - Defines the storage format of the table. Currently only "iceberg" is supported on Athena, and "delta" on the filesystem. Other destinations ignore this hint. -
file_formatTTableHintTemplate[TFileFormat], optional - Format of the file in which resource data is stored. Useful when importing external files. Usepreferredto force a file format that is preferred by the destination used. This setting superseded theload_file_formatpassed to pipelinerunmethod. -
referencesTTableHintTemplate[TTableReferenceParam], optional - A list of references to other table's columns. A list in the form of[{'referenced_table': 'other_table', 'columns': ['other_col1', 'other_col2'], 'referenced_columns': ['col1', 'col2']}]. Table and column names will be normalized according to the configured naming convention. -
nested_hintsOptional[TTableHintTemplate[Dict[TTableNames, TResourceNestedHints]]], optional - Hints for nested tables created by this resource. -
selectedbool, optional - WhenTruedlt pipelinewill extract and load this resource, ifFalse, the resource will be ignored. -
specType[BaseConfiguration], optional - A specification of configuration and secret values required by the source. -
parallelizedbool, optional - IfTrue, the resource generator will be extracted in parallel with other resources. Transformers that return items are also parallelized. Defaults toFalse. -
incrementalOptional[TIncrementalConfig], optional - An incremental configuration for the resource. -
_impl_clsType[TDltResourceImpl], optional - A custom implementation of DltResource, may be also used to providing just a typing stub -
sectionOptional[TTableHintTemplate[str]], optional - Configuration section that comes right after 'sourcesin default layout. If not present, the current python module name will be used. Default layout issources.<section>.<name>.<key_name>`. Note that resource section is used only when a single resource is passed to the pipeline. -
_base_specType[BaseConfiguration], optional - A base spec used to which spec derived from resource function arguments is added -
standalonebool, optional - Deprecated. Past functionality got merged into regular resource -
data_fromTUnboundDltResource, optional - Allows to pipe data from one resource to another to build multi-step pipelines.
Raises:
ResourceNameMissing- indicates that name of the resource cannot be inferred from thedatabeing passed.InvalidResourceDataType- indicates that thedataargument cannot be converted intodlt resource
Returns:
Any- TDltResourceImpl instance which may be loaded, iterated or combined with other resources into a pipeline.
transformer
def transformer(
f: Optional[Callable[Concatenate[TDataItem, TResourceFunParams],
Any]] = None,
data_from: TUnboundDltResource = DltResource.Empty,
name: TTableHintTemplate[str] = None,
table_name: TTableHintTemplate[str] = None,
max_table_nesting: int = None,
write_disposition: TTableHintTemplate[TWriteDisposition] = None,
columns: TTableHintTemplate[TAnySchemaColumns] = None,
primary_key: TTableHintTemplate[TColumnNames] = None,
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
file_format: TTableHintTemplate[TFileFormat] = None,
references: TTableHintTemplate[TTableReferenceParam] = None,
nested_hints: Optional[TTableHintTemplate[Dict[
TTableNames, TResourceNestedHints]]] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
parallelized: bool = False,
section: Optional[TTableHintTemplate[str]] = None,
standalone: bool = None,
_impl_cls: Type[TDltResourceImpl] = DltResource) -> Any
A form of dlt resource that takes input from other resources via data_from argument in order to enrich or transform the data.
The decorated function f must take at least one argument of type TDataItems (a single item or list of items depending on the resource data_from). dlt will pass
metadata associated with the data item if argument with name meta is present. Otherwise, transformer function may take more arguments and be parametrized
like the resources.
You can bind the transformer early by specifying resource in data_from when the transformer is created or create dynamic bindings later with | operator
which is demonstrated in example below:
Example:
@dlt.resource
def players(title, chess_url=dlt.config.value):
r = requests.get(f"{chess_url}titled/{title}")
yield r.json()["players"] # returns list of player names
# this resource takes data from players and returns profiles
@dlt.transformer(write_disposition="replace")
def player_profile(player: Any) -> Iterator[TDataItems]:
r = requests.get(f"{chess_url}player/{player}")
r.raise_for_status()
yield r.json()
# pipes the data from players into player profile to produce a list of player profiles
list(players("GM") | player_profile)
Arguments:
-
fOptional[Callable[Concatenate[TDataItem, TResourceFunParams], Any]] - a function taking minimum one argument of TDataItems type which will receive data yielded fromdata_fromresource. -
data_fromTUnboundDltResource, optional - a resource that will send data to the decorated functionf -
nameTTableHintTemplate[str], optional - A name of the resource that by default also becomes the name of the table to which the data is loaded. If not present, the name of the decorated function will be used. -
table_nameTTableHintTemplate[str], optional - An table name, if different fromname. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
max_table_nestingint, optional - A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON. -
write_dispositionTTableHintTemplate[TWriteDisposition], optional - Controls how to write data to a table.appendwill always add new data at the end of the table.replacewill replace existing data with new data.skipwill prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
columnsTTableHintTemplate[TAnySchemaColumns], optional - A list, dict or pydantic model of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
primary_keyTTableHintTemplate[TColumnNames], optional - A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
merge_keyTTableHintTemplate[TColumnNames], optional - A column name or a list of column names that define a merge key. Typically used with "merge" write disposition to remove overlapping data ranges ie. to keep a single record for a given day. This argument also accepts a callable that is used to dynamically create tables for stream-like resources yielding many datatypes. -
schema_contractTTableHintTemplate[TSchemaContract], optional - Schema contract settings that will be applied to all resources of this source (if not overridden in the resource itself) -
table_formatTTableHintTemplate[TTableFormat], optional - Defines the storage format of the table. Currently only "iceberg" is supported on Athena, and "delta" on the filesystem. Other destinations ignore this hint. -
file_formatTTableHintTemplate[TFileFormat], optional - Format of the file in which resource data is stored. Useful when importing external files. Usepreferredto force a file format that is preferred by the destination used. This setting superseded theload_file_formatpassed to pipelinerunmethod. -
referencesTTableHintTemplate[TTableReferenceParam], optional - A list of references to other table's columns. A list in the form of[{'referenced_table': 'other_table', 'columns': ['other_col1', 'other_col2'], 'referenced_columns': ['col1', 'col2']}]. Table and column names will be normalized according to the configured naming convention. -
nested_hintsOptional[TTableHintTemplate[Dict[TTableNames, TResourceNestedHints]]], optional - Hints for nested tables created by this resource. -
selectedbool, optional - WhenTruedlt pipelinewill extract and load this resource, ifFalse, the resource will be ignored. -
specType[BaseConfiguration], optional - A specification of configuration and secret values required by the source. -
parallelizedbool, optional - WhenTruethe resource will be loaded in parallel. -
sectionOptional[TTableHintTemplate[str]], optional - Configuration section that comes right after 'sourcesin default layout. If not present, the current python module name will be used. Default layout issources.<section>.<name>.<key_name>`. Note that resource section is used only when a single resource is passed to the pipeline. -
standalonebool, optional - Deprecated. Past functionality got merged into regular resource -
_impl_clsType[TDltResourceImpl], optional - A custom implementation of DltResource, may be also used to providing just a typing stub
Raises:
ResourceNameMissing- indicates that name of the resource cannot be inferred from thedatabeing passed.InvalidResourceDataType- indicates that thedataargument cannot be converted intodlt resource
Returns:
Any- TDltResourceImpl instance which may be loaded, iterated or combined with other resources into a pipeline.
get_source_schema
def get_source_schema() -> Schema
Should be executed from inside the function decorated with @dlt.resource
Returns:
Schema- The current writeable source schema
get_source
def get_source() -> DltSource
Should be executed from inside the function decorated with @dlt.resource
Returns:
DltSource- The current writable source object
get_resource
def get_resource() -> DltResource
Should be executed from inside the function decorated with @dlt.resource
Returns:
DltResource- The resource object to which the currently executing pipe belongs
get_resource_metrics
def get_resource_metrics() -> Dict[str, Any]
Should be executed from inside the function decorated with @dlt.resource
Returns:
Dict[str, Any]: The customizable metrics dictionary