Skip to main content
Version: devel

dlt.extract.source

DltResourceDict Objects

class DltResourceDict(Dict[str, DltResource])

View source on GitHub

selected

@property
def selected() -> Dict[str, DltResource]

View source on GitHub

Returns a subset of all resources that will be extracted and loaded to the destination.

extracted

@property
def extracted() -> Dict[str, DltResource]

View source on GitHub

Returns a dictionary of all resources that will be extracted. That includes selected resources and all their parents. For parents that are not added explicitly to the source, a mock resource object is created that holds the parent pipe and derives the table schema from the child resource

selected_dag

@property
def selected_dag() -> List[Tuple[str, str]]

View source on GitHub

Returns a list of edges of directed acyclic graph of pipes and their parents in selected resources

select

def select(*resource_names: str) -> Dict[str, DltResource]

View source on GitHub

Selects resource_name to be extracted, and unselects remaining resources.

detach

def detach(resource_name: str = None) -> DltResource

View source on GitHub

Clones resource_name (including parent resource pipes) and removes source contexts. Defaults to the first resource in the source if resource_name is None.

DltSource Objects

class DltSource(Iterable[TDataItem])

View source on GitHub

Groups several dlt resources under a single schema and allows to perform operations on them.

The instance of this class is created whenever you call the dlt.source decorated function. It automates several functions for you:

  • You can pass this instance to dlt run method in order to load all data present in the dlt resources.
  • You can select and deselect resources that you want to load via with_resources method
  • You can access the resources (which are DltResource instances) as source attributes
  • It implements Iterable interface so you can get all the data from the resources yourself and without dlt pipeline present.
  • It will create a DAG from resources and transformers and optimize the extraction so parent resources are extracted only once
  • You can get the schema for the source and all the resources within it.
  • You can use a run method to load the data with a default instance of dlt pipeline.
  • You can get source read only state for the currently active Pipeline instance

from_data

@classmethod
def from_data(cls, schema: Schema, section: str, data: Any) -> Self

View source on GitHub

Converts any data supported by dlt run method into dlt source with a name section.name and schema schema.

max_table_nesting

@property
def max_table_nesting() -> int

View source on GitHub

A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.

root_key

@property
def root_key() -> bool

View source on GitHub

Enables merging on all resources by propagating root foreign key to nested tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge

exhausted

@property
def exhausted() -> bool

View source on GitHub

Check all selected pipes whether one of them has started. if so, the source is exhausted.

resources

@property
def resources() -> DltResourceDict

View source on GitHub

A dictionary of all resources present in the source, where the key is a resource name.

Returns:

  • DltResourceDict - A dictionary of all resources present in the source, where the key is a resource name.

selected_resources

@property
def selected_resources() -> Dict[str, DltResource]

View source on GitHub

A dictionary of all the resources that are selected to be loaded.

Returns:

Dict[str, DltResource]: A dictionary of all the resources that are selected to be loaded.

discover_schema

def discover_schema(item: TDataItem = None, meta: Any = None) -> Schema

View source on GitHub

Computes table schemas for all selected resources in the source and merges them with a copy of current source schema. If item is provided, dynamic tables will be evaluated, otherwise those tables will be ignored.

with_resources

def with_resources(*resource_names: str) -> "DltSource"

View source on GitHub

A convenience method to select one of more resources to be loaded. Returns a clone of the original source with the specified resources selected.

decompose

def decompose(strategy: TDecompositionStrategy) -> List["DltSource"]

View source on GitHub

Decomposes source into a list of sources with a given strategy.

"none" will return source as is "scc" will decompose the dag of selected pipes and their parent into strongly connected components

add_limit

def add_limit(max_items: int) -> "DltSource"

View source on GitHub

Adds a limit max_items yielded from all selected resources in the source that are not transformers.

This is useful for testing, debugging and generating sample datasets for experimentation. You can easily get your test dataset in a few minutes, when otherwise you'd need to wait hours for the full loading to complete.

Notes:

  1. Transformers resources won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
  2. Each yielded item may contain several records. add_limit only limits the "number of yields", not the total number of records.

Arguments:

  • max_items int - The maximum number of items to yield

Returns:

  • "DltSource" - returns self

parallelize

def parallelize() -> "DltSource"

View source on GitHub

Mark all resources in the source to run in parallel.

Only transformers and resources based on generators and generator functions are supported, unsupported resources will be skipped.

run

@property
def run() -> SupportsPipelineRun

View source on GitHub

A convenience method that will call run run on the currently active dlt pipeline. If pipeline instance is not found, one with default settings will be created.

state

@property
def state() -> StrAny

View source on GitHub

Gets source-scoped state from the active pipeline. PipelineStateNotAvailable is raised if no pipeline is active

clone

def clone(with_name: str = None) -> "DltSource"

View source on GitHub

Creates a deep copy of the source where copies of schema, resources and pipes are created.

If with_name is provided, a schema is cloned with a changed name

__iter__

def __iter__() -> Iterator[TDataItem]

View source on GitHub

Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class.

A read-only state is provided, initialized from active pipeline state. The state is discarded after the iterator is closed.

A source config section is injected to allow secrets/config injection as during regular extraction.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.