dltHub
Blog /

Materializing Multi-Asset REST API Sources with dlt, Dagster, and DuckDB

  • Jairus Martinez,
    Analytics Engineer at Brooklyn Data Company

If you’ve been keeping tabs on data engineering tooling, two open-source projects have been continuing to gain lots of interest and traction. These two projects are Dagster and dlt.

This article was originally published on Medium. You can read it here.

In short, Dagster is a Python-based orchestration framework that focuses on building data/AI platforms for data engineers and dlt is a Python library that offers a feature-rich API to easily load data into target systems. From my view, the core value prop of these tools is that they’re code-first, offer declarative yet highly flexible frameworks, prioritize the developer experience, and can be integrated into a software development lifecycle.

While there are plenty of examples and documentation on how to use these tools individually and many tutorials on using them together, most demos are single source-to-target materializations — that is one CSV file, one parquet file, one API endpoint, loaded to a target.

Many ingestion patterns read from multiple source files and/or source endpoints to replicate data into the final destination. This write-up goes over three integration patterns for using dlt within the Dagster orchestration framework to extract multiple source API endpoints and materializing them as multi-assets. In the context of Dagster, multi-assets are a way to define multiple data assets that are created or updated by a single operation. This concept is particularly useful when you need to generate several related data assets from the same processing function (in this case, generating individual tables from a REST API source).

As a bonus, we’ll also be using the new dg CLI and will also be demoing the new DuckDB Local UI!

To get started, clone the following pokemon-dagster-dlt repo from GitHub and follow the “Getting Started” instructions for local setup.

Utilizing the new dg CLI

Once the project is cloned with the required dependencies, you can test out the following commands:

  1. dg docs serve — serve the dg documentation site locally
  2. dg list defs — list asset definitions in your project
  3. dg check defs — check for validity of Definitions
  4. dg launch — launch assets
  5. dg dev — run the Dagster daemon/webserver locally to interact with, view, and materialize assets

Besides these utilities that allow for a truly CLI-first workflow, dg allows for project scaffolding (via dg scaffold), easier Python environment management (with out-of-the-box integration with uv), automatic definitions discovery, and a declarative components framework for creating and abstracting pipelines.

PokeAPI with dlt

We will use the PokeAPI REST API — specifically hitting the pokemon, berry, and location endpoints . The following three examples in this article will build on and extend the original “Loading Data from a REST API” tutorial created by dltHub.

dlt + Dagster Examples

With all of that out of the way, we can take a look at three integration patterns for using dlt with Dagster. Within src/defs/assets/ of the project repo, you will see three .py files. Each of these files has the same outcome of landing the three endpoints as tables in DuckDB; however, there are key differences in the core approaches, architectures, and tradeoffs between each.

1_poke_rest_api.py
import dagster as dg
import dlt
from dlt.sources.rest_api import rest_api_source
from src.defs.utils import DUCKDB_PATH # could be an environment variable or config parameter

@dg.multi_asset(
    outs={  # Define Dagster asset definitions/metadata for the 3 dlt Assets
        "pokemon": dg.AssetOut(
            key=[
                "poke_api_1", # Asset key prefix
                "pokemon_1",  # Asset key name
            ],
            description="General Pokemon data retrieved from the PokeAPI /pokemon endpoint.", # Asset description
        ),
        "berry": dg.AssetOut(
            key=[
                "poke_api_1",
                "berry_1",
            ],
            description=(
                "Berry characteristics from /berry endpoint including growth time, size, "
                "and cultivation properties. Affects Pokemon stats when consumed."
            ),
        ),
        "location": dg.AssetOut(
            key=[
                "poke_api_1",
                "location_1",
            ],
            description="In-game locations from /location endpoint with regional data and game appearances",
        ),
    },
    group_name="dltHub__poke_1",  # Asset group name in Dagster UI
    compute_kind="dlt",
)
def load_pokemon_1():
    """
    Loads data from three endpoints of the PokeAPI using dlt and materializes them as separate Dagster assets.

    Steps:
    1. Initializes a dlt pipeline that targets a DuckDB database. The pipeline name determines the output .duckdb file.
    2. Configures a dlt REST API source to pull data from the /pokemon, /berry, and /location endpoints with a high limit.
    3. Runs the pipeline (which fetches and loads the data into the DuckDB destination).
    4. Returns a separate Dagster Output for each resource (matching the order of the resources in the source config).

    Returns:
        Tuple[Output, Output, Output]: Dagster Output objects for pokemon, berry, and location assets, respectively.
    """

    # Initialize the dlt pipeline with DuckDB as the destination.
    pipeline = dlt.pipeline(
        pipeline_name="rest_api_pokemon_1",
        destination=dlt.destinations.duckdb(DUCKDB_PATH + "rest_api_pokemon_1.duckdb"),  # Path to the DuckDB database file
        dataset_name="poke_rest_api_1",      # Logical dataset name within the DuckDB database
    )

    # Define the REST API source configuration for dlt.
    # - base_url: Root URL for the PokeAPI
    # - resource_defaults: Default query parameters (e.g., limit=1000 for all endpoints)
    # - resources: List of endpoint names to fetch
    pokemon_source = rest_api_source(
        {
            "client": {"base_url": "https://pokeapi.co/api/v2/"},
            "resource_defaults": {
                "endpoint": {
                    "params": {
                        "limit": 1000,
                    },
                },
            },
            "resources": [
                "pokemon",   # /pokemon endpoint
                "berry",     # /berry endpoint
                "location",  # /location endpoint
            ],
        }
    )

    # Run the pipeline, which loads data from all specified resources into DuckDB.
    # The returned load_info is a list of load result objects, one per resource (order matches the 'resources' list).
    load_info = pipeline.run(pokemon_source)
    print(load_info)  # For debugging/inspection in local runs

    # Dagster expects either yielded outputs or a tuple of Output objects when multiple outputs are defined.
    # Here, we return the load_info for each resource as separate Dagster Outputs, in the same order as the outs dict.
    return (
        dg.Output(value=load_info[0]),  # Output for the 'pokemon' asset
        dg.Output(value=load_info[1]),  # Output for the 'berry' asset
        dg.Output(value=load_info[2]),  # Output for the 'location' asset
    )

This example demonstrates how to use the dlt library within the Dagster framework (without the official integration) to materialize multiple assets from three different endpoints.

Core approach:

  • dlt EL is declarative, Dagster wrapper allows for explicit configuration of metadata and dependencies
  • Uses Dagster’s @dg.multi_asset decorator to define a single asset function that materializes three distinct assets
    (pokemon, berry, location) from a single dlt pipeline run
  • Data from the pokemon, berry, and location endpoints are loaded as individual tables into a DuckDB database

Architecture:

  • Lightweight Dagster wrapper around dlt’s rest_api_source
  • Control Level: Medium (pipeline config exposed, resources hidden) — dlt EL is heavily declarative and Dagster metadata/dependencies are explicitly configured.
2_poke_rest_api.py
import dagster as dg
import dlt
from dlt.extract.resource import DltResource
from dagster_dlt import DagsterDltResource, DagsterDltTranslator, dlt_assets
from dagster_dlt.translator import DltResourceTranslatorData
from dlt.sources.rest_api import rest_api_source
from collections.abc import Iterable
from src.defs.utils import DUCKDB_PATH # could be an environment variable or config parameter

# Define the dlt source for the PokeAPI.
# This configuration specifies the base URL, default parameters (limit=1000), and the resources to extract.
pokemon_source = rest_api_source(
    {
        "client": {"base_url": "https://pokeapi.co/api/v2/"},
        "resource_defaults": {
            "endpoint": {
                "params": {
                    "limit": 1000,
                },
            },
        },
        "resources": [
            "pokemon",   # General Pokemon data
            "berry",     # Berry data
            "location",  # Location data
        ],
    }
)

class CustomDagsterDltTranslator(DagsterDltTranslator):
    """
    Custom translator to control how dlt resources are mapped to Dagster asset keys and dependencies.
    Unlike the other examples that configured these core asset attributes within the `@dg.multi_asset`
    decorator, custom metadata/dependecies have to be configured here. 

    - get_asset_key: Prefixes each asset key with 'poke_api_2' for better organization in the Dagster UI.
    - get_deps_asset_keys: Specifies that assets have no upstream dependencies (flat structure).
    - get_asset_spec: Returns a representation of an asset as an AssetSpec object (contains the core attributes of the asset)
    """
    def get_asset_key(self, resource: DltResource) -> dg.AssetKey:
        """Asset key is the resource name, prefixed with 'poke_api_2'"""
        return dg.AssetKey(f"{resource.name}_2").with_prefix("poke_api_2")

    def get_deps_asset_keys(self, resource: DltResource) -> Iterable[dg.AssetKey]:
        """No upstream dependencies for these assets"""
        return []
    
    def get_asset_spec(self, data: DltResourceTranslatorData) -> dg.AssetSpec:
        """Maps asset descriptions"""
        default_spec = super().get_asset_spec(data)
        asset_key_str = "/".join(default_spec.key.path) # Create string of asset keys that match `dg list defs`
        # Set custom descriptions based on the asset key
        descriptions = {
            "poke_api_2/location_2": "In-game locations from /location endpoint with regional data and game appearances",
            "poke_api_2/berry_2": "Berry characteristics from /berry endpoint including growth time, size and cultivation properties. Affects Pokemon stats when consumed.", 
            "poke_api_2/pokemon_2": "General Pokemon data retrieved from the PokeAPI /pokemon endpoint.",
        }
        description = descriptions.get(asset_key_str, "Default description")
        return default_spec.replace_attributes(description=description) # Map the proper description

@dlt_assets(
    dlt_source=pokemon_source,
    dlt_pipeline=dlt.pipeline(
        pipeline_name="rest_api_pokemon_2",  
        dataset_name="poke_rest_api_2",      # Logical dataset name within DuckDB
        destination=dlt.destinations.duckdb(DUCKDB_PATH + "rest_api_pokemon_2.duckdb"),
    ),
    group_name="dltHub__poke_2",             # Asset group name in Dagster UI
    dagster_dlt_translator=CustomDagsterDltTranslator(),  # Use custom asset key translation
)
def load_pokemon_2(context: dg.AssetExecutionContext, dlt: DagsterDltResource):
    """
    Materializes assets by running the dlt pipeline within Dagster's orchestration context.
    Note:
        Because multi asset metadata is inferred with the dlt-dagster library and not easily configured 
        for each individual asset, the asset descriptions will inherit this docstring of the @dlt_asset decorated function load_pokemon_2 
        for each asset. If this docstring was blank, there would be no description populated. 
    Args:
        context (AssetExecutionContext): Dagster context for asset execution.
        dlt (DagsterDltResource): Resource providing access to dlt pipeline execution.
    Yields:
        Materialization results for each dlt resource (pokemon, berry, location) as Dagster assets.
    """
    # Trigger the dlt pipeline run and yield materialized asset results for Dagster.
    yield from dlt.run(context=context)

This example demonstrates the use of the official dlt + Dagster integration, dagster-dlt to materialize multiple assets from three endpoints.

Core approach:

  • Fully declarative and integrated approach
  • Leverages the @dg.dlt_assets decorator to automatically generate Dagster assets from a dlt source and pipeline
  • Need to use a custom DagsterDltTranslator class to control asset key naming and dependencies
  • The integration enables streamlined orchestration, monitoring, and management of dlt pipelines within Dagster
  • Data from the pokemon, berry, and location endpoints are loaded as individual tables into a DuckDB database

Architecture:

  • Coupled integration via dagster-dlt
  • Control Level: Low (convention over configuration) — dlt EL is heavily declarative, and Dagster metadata/dependencies are auto-generated from dlt objects.
3_poke_rest_api.py
import dagster as dg
import dlt
import requests
from src.defs.utils import DUCKDB_PATH # could be an environment variable or config parameter

@dlt.resource
def pokemon_resource():
    """
    Fetches data from the PokeAPI /pokemon endpoint.
    
    Yields:
        Response JSON object
        - count: Total number of Pokemon available
        - results: List of Pokemon with name and URL
        - next: Pagination URL (not handled in this example)
    
    Raises:
        HTTPError: If the API request fails
    """
    url = "https://pokeapi.co/api/v2/pokemon"
    response = requests.get(url, params={"limit": 1000})
    response.raise_for_status()
    yield response.json()

@dlt.resource
def berry_resource():
    """
    Fetches data from the PokeAPI /berry endpoint.
    
    Returns:
        Response JSON object
        - growth_time: How long the berry takes to grow
        - max_harvest: Maximum number of berries yielded
        - size: Berry size in millimeters
        - smoothness: Smoothness characteristic
        - soil_dryness: Time it takes to dry the soil
    """
    url = "https://pokeapi.co/api/v2/berry"
    response = requests.get(url, params={"limit": 1000})
    response.raise_for_status()
    yield response.json()

@dlt.resource
def location_resource():
    """
    Fetches data from the PokeAPI /location endpoint.
    
    Returns:
        Response JSON object
        - region: Associated game region
        - names: Localized names
        - game_indices: Appearance in different game versions
    """
    url = "https://pokeapi.co/api/v2/location"
    response = requests.get(url, params={"limit": 1000})
    response.raise_for_status()
    yield response.json()

@dlt.source
def pokeapi_source():
    """
    Aggregates multiple dlt resources into a single source for pipeline processing.
    Returns:
        List[DltResource]: Contains pokemon, berry, and location resources in order
    """
    return [
        pokemon_resource(),  # First resource - will map to load_info[0]
        berry_resource(),    # Second resource - will map to load_info[1]
        location_resource()  # Third resource - will map to load_info[2]
    ]

@dg.multi_asset(
    outs={  # Define Dagster asset definitions/metadata for the 3 dlt Assets
        "pokemon": dg.AssetOut(
            key=["poke_api_3", "pokemon_3"], # Asset key prefix and name
            description="General Pokemon data retrieved from the PokeAPI /pokemon endpoint.", # Asset key description
        ),
        "berry": dg.AssetOut(
            key=["poke_api_3", "berry_3"],
            description=(
                "Berry characteristics from /berry endpoint including growth time, size, "
                "and cultivation properties. Affects Pokemon stats when consumed."
            ),
        ),
        "location": dg.AssetOut(
            key=["poke_api_3", "location_3"],
            description="In-game locations from /location endpoint with regional data and game appearances",
        ),
    },
    group_name="dltHub__poke_3", # Asset group name in Dagster UI
    compute_kind="dlt",
)
def load_pokemon_3():
    """
    Executes the dlt pipeline with custom-defined resources and materializes Dagster assets.
    
    Pipeline configuration:
    - Uses 'replace' write disposition to overwrite existing data on each run
    - Dataset name is specified at runtime rather than pipeline initialization
    - Output order matches resource order in pokeapi_source return list
    
    Returns:
        Tuple[Output, Output, Output]: Dagster Output objects for each resource's load info
    """
    # Initialize pipeline
    pipeline = dlt.pipeline(
        pipeline_name="rest_api_pokemon_3",
        destination=dlt.destinations.duckdb(DUCKDB_PATH + "rest_api_pokemon_3.duckdb"),  
    )

    # Run pipeline with explicit dataset name and write disposition
    load_info = pipeline.run(
        pokeapi_source(),
        dataset_name="poke_rest_api_3",  # Logical dataset grouping in DuckDB
        write_disposition="replace",     # Full refresh pattern - replaces existing data
    )
    
    print(load_info)  # Contains load metrics and schema changes

    # Return outputs in same order as resources defined in pokeapi_source
    return (
        dg.Output(value=load_info[0]),  # pokemon_resource results
        dg.Output(value=load_info[1]),  # berry_resource results
        dg.Output(value=load_info[2]),  # location_resource results
    )

This final example demonstrates a more modular approach to using dlt with Dagster, explicitly defining individual dlt resources and combining them into a source, then materializing them as Dagster assets.

Core approach:

  • Has control over both the data extraction process and Dagster metadata
  • Uses @dlt.resource and @dlt.source decorators for explicit pipeline construction
  • Implements API calls with the requests library instead of dlt’s rest_api_source
  • Each resource is defined separately, allowing for more granular control over data extraction
  • Data from the pokemon, berry, and location endpoints are loaded as individual tables into a DuckDB database

Architecture:

  • Dagster @dg.multi_asset decorator wraps dlt’s @dlt.resource and @dlt.source decorators
  • Control Level: High — dlt is explicitly configured using @dlt.resource and @dlt.source, which allows for more control over EL. The code is less declarative and easier to customize. Dagster’s metadata/dependencies are also explicitly configured.

So, which approach is best?

Like any answer to any technical question, it depends. Each approach has tradeoffs concerning code flexibility, maintenance, and the coupling of frameworks.

The first example (1_poke_rest_api.py) strikes a solid balance. It leverages dlt’s declarative Rest API Source for EL, utilizes the @dg.multi_asset decorator to wrap dlt code for explicit asset configuration, and offers a decoupled approach to integrating dlt and Dagster.

Bar the use of the DLTDagsterTranslator class, 2_poke_rest_api.py is the quickest and most streamlined approach. Both dlt and Dagster are fully integrated to load and materialize assets with a declarative approach, leading to fewer lines of code, easier configuration, and lower maintenance. Of course, this comes with a few drawbacks. The two major ones being lower code flexibility and tighter coupling of your dlt and Dagster code (think future migrations).

The final example, 3_poke_rest_api.py optimizes for flexibility and control. EL through dlt is explicit and handled by the developer. Dagster’s asset configuration is also explicit. While this allows for enhanced customization of code and metadata, more burden is on the developer for the extraction logic. Lastly, like the first example, Dagster simply wraps dlt. This decouples the two libraries and allows for flexibility for where you want your EL code to run.

Bonus: Materializing dlt Assets and Utilizing the DuckDB UI

Now that we have a solid handle on the three different approaches, we can get into Dagster, materialize the assets, and then query the data directly from a .duckdb database file.

First, check to make sure your Dagster definitions are valid. With the virtual environment active, run dg check defs. If dg is able to find and validate your definitions, it will return All definitions loaded successfully. This greatly improves iteration as you no longer need to attempt to run the Dagster daemon/webserver to find out if your definitions are valid. As you update your code, you can easily run this command to get quick feedback on any errors.

dg check defs

Next, check to see what assets exist in your project. We can view our definitions directly in the CLI via dg list defs .


dg list defs

The CLI output shows the asset keys, the groups that each asset belongs to, what their dependencies are, the compute kind, and descriptions that we provide.

With our assets validated, let’s luanch the Dagster daemon/webserver by running dg dev . For those familar with Dagster, this should look and feel very similar to the dagster dev command.

Open the Dagster webserver on your localhost:

dg dev
Dagster UI @ localhost: http://127.0.0.1:3000

Within the UI, you can see all 9 assets defined (3 assets x 3 examples). Select “Materialize all” in the UI (or materialize via the CLI with dg launch) to ingest the PokeAPI data into DuckDB:

From the “Runs” tab, you will be able to see the status and logging of your asset materializations. The data will be loaded into the files/ directory in your local project directory.

files/   # data loaded here
├── rest_api_pokemon_1.duckdb
├── rest_api_pokemon_2.duckdb
└── rest_api_pokemon_3.duckdb
src/
tests/

With the data loaded, we can now utilize the DuckDB UI to inspect, query, and explore the newly landed data locally in dev! All we have to do is run the following command in the CLI: duckdb — ui . This will automatically open the DuckDB UI!

To add the local .duckdb file for querying as a database, go to the “+” sign under “Attached databases” and add the DuckDB file path with respect to your current working directory:

Now, you can query the data just like any SQL-based notebook IDE.

DuckDB UI

On the left panel of the UI , you can see all the data that’s currently loaded. The rest_api_pokemon_1.duckdb file is the database, the dlt pipeline name is the resulting schema, and the three tables are the endpoint (berry, location, pokemon) data as loaded by dlt.

Again, you have access to a SQL IDE to quickly and easily check your work, explore the data, and do validations!

Conclusion

The integration of dlt and Dagster represents a significant advancement in the modern data engineering toolkit. Both enable code-first, declarative, and flexible data pipelines that can integrate within a mature development lifecycle.

As we’ve demonstrated through these three integration patterns, developers have multiple options to balance control, maintainability, and framework coupling according to their specific needs. The first approach offers a balanced solution with explicit asset configuration while leveraging dlt’s declarative capabilities. The second approach, using the official dagster-dlt integration, provides the most streamlined experience but with less flexibility. The third approach maximizes control and customization at the cost of a more involved implementation.

Beyond the technical implementation details, what’s truly important is how these tools are reshaping the Analytics Development Lifecycle (ADLC). The CLI-first workflow, rapid local iteration, and immediate data validation through the DuckDB UI significantly compress the feedback loop for data engineers. This acceleration enables:

  1. Faster experimentation and prototyping
  2. More robust testing before production deployment
  3. Easier collaboration through standardized, code-based implementations
  4. Greater focus on business value rather than plumbing and configuration

As data tooling continues to mature, we’re witnessing a convergence of software engineering best practices with data engineering workflows. This convergence enables data teams to deliver more reliable, maintainable, and scalable data products to their organizations. The result is not just better technical implementations, but a fundamental improvement in how data teams can respond to business needs and deliver strategic value.

Need help with dlt+ dagster deployment? Brooklyn Data Company is a consultancy that partners with its clients to build data capabilities that last, and can help you leverage solutions like dlt or Dagster.