Skip to main content
Version: 1.4.0 (latest)

Deploy with Dagster

Introduction to Dagster

Dagster is an orchestrator designed for developing and maintaining data assets, such as tables, datasets, machine learning models, and reports. Dagster ensures these processes are reliable and focuses on using software-defined assets (SDAs) to simplify complex data management, enhance the ability to reuse code, and provide a better understanding of data.

To read more, please refer to Dagster’s documentation.

Dagster Cloud features

Dagster Cloud offers an enterprise-level orchestration service with serverless or hybrid deployment options. It incorporates native branching and built-in CI/CD to prioritize the developer experience. It enables scalable, cost-effective operations without the hassle of infrastructure management.

Dagster deployment options: Serverless versus Hybrid

The serverless option fully hosts the orchestration engine, while the hybrid model offers flexibility to use your computing resources, with Dagster managing the control plane, reducing operational overhead and ensuring security.

For more info, please refer to the Dagster Cloud docs.

Using Dagster for free

Dagster offers a 30-day free trial during which you can explore its features, such as pipeline orchestration, data quality checks, and embedded ELTs. You can try Dagster using its open source or by signing up for the trial.

Building data pipelines with dlt

How does dlt integrate with Dagster for pipeline orchestration?

dlt integrates with Dagster for pipeline orchestration, providing a streamlined process for building, enhancing, and managing data pipelines. This enables developers to leverage dlt's capabilities for handling data extraction and load, and Dagster's orchestration features to efficiently manage and monitor data pipelines.

Dagster supports native integration with dlt, here is a guide on how this integration works.

Orchestrating dlt pipeline on Dagster

Here's a concise guide to orchestrating a dlt pipeline with Dagster, creating a pipeline that ingests GitHub issues data from a repository and loads it into DuckDB.

You can find the full example code in this repository.

The steps are as follows:

  1. Install Dagster and the embedded ELT package using pip:

    pip install dagster dagster-embedded-elt
  2. Set up a Dagster project:

    mkdir dagster_github_issues
    cd dagster_github_issues
    dagster project scaffold --name github-issues

    image

  3. In your Dagster project, define the dlt pipeline in the github_source folder.

    Note: The dlt Dagster helper works only with dlt sources. Your resources should always be grouped in a source.

    import dlt
    ...
    @dlt.resource(
    table_name="issues",
    write_disposition="merge",
    primary_key="id",
    )
    def get_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
    ):
    url = (
    f"{BASE_URL}?since={updated_at.last_value}&per_page=100&sort=updated"
    "&direction=desc&state=open"
    )
    yield pagination(url)

    @dlt.source
    def github_source():
    return get_issues()
  4. Create a dlt_assets definition.

    The @dlt_assets decorator takes a dlt_source and dlt_pipeline parameter. In this example, we used the github_source source and created a dlt_pipeline to ingest data from GitHub to DuckDB.

    Here’s an example of how to define assets (github_source/assets.py):

    import dlt
    from dagster import AssetExecutionContext
    from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
    from .github_pipeline import github_source

    @dlt_assets(
    dlt_source=github_source(),
    dlt_pipeline=dlt.pipeline(
    pipeline_name="github_issues",
    dataset_name="github",
    destination="duckdb",
    progress="log",
    ),
    name="github",
    group_name="github",
    )
    def dagster_github_assets(context: AssetExecutionContext, dlt: DagsterDltResource):
    yield from dlt.run(context=context)

    For more information, please refer to Dagster’s documentation.

  5. Create the Definitions object.

    The last step is to include the assets and resource in a Definitions object (github_source/definitions.py). This enables Dagster tools to load everything we have defined:

    import assets
    from dagster import Definitions, load_assets_from_modules
    from dagster_embedded_elt.dlt import DagsterDltResource

    dlt_resource = DagsterDltResource()
    all_assets = load_assets_from_modules([assets])

    defs = Definitions(
    assets=all_assets,
    resources={
    "dlt": dlt_resource,
    },
    )
  6. Run the web server locally:

    1. Install the necessary dependencies using the following command:

      pip install -e ".[dev]"

      We use -e to install dependencies in editable mode. This allows changes to be automatically applied when we modify the code.

    2. Run the project:

      dagster dev
    3. Navigate to localhost:3000 in your web browser to access the Dagster UI.

      image

  7. Run the pipeline.

    Now that you have a running instance of Dagster, you can run your data pipeline.

    To run the pipeline, go to Assets and click the Materialize button in the top right. In Dagster, materialization refers to executing the code associated with an asset to produce an output.

    image

    You will see the following logs in your command line:

    image

    Want to see real-world examples of dlt in production? Check out how dlt is used internally at Dagster in the Dagster Open Platform project.

info

For a complete picture of Dagster's integration with dlt, please refer to their documentation. This documentation offers a detailed overview and steps for ingesting GitHub data and storing it in Snowflake. You can use a similar approach to build your pipelines.

Frequently Asked Questions

  • Can I remove the generated .dlt folder with secrets.toml and config.toml files?

    Yes. Since dlt is compatible with environment variables, you can use this for secrets required by both Dagster and dlt.

  • I'm working with several sources – how can I best group these assets?

    To effectively group assets in Dagster when working with multiple sources, use the group_name parameter in your @dlt_assets decorator. This helps organize and visualize assets related to a particular source or theme in the Dagster UI. Here’s a simplified example:

    import dlt
    from dagster_embedded_elt.dlt import dlt_assets
    from dlt_sources.google_analytics import google_analytics

    # Define assets for the first Google Analytics source
    @dlt_assets(
    dlt_source=google_analytics(),
    dlt_pipeline=dlt.pipeline(
    pipeline_name="google_analytics_pipeline_1",
    destination="bigquery",
    dataset_name="google_analytics_data_1"
    ),
    group_name='Google_Analytics'
    )
    def google_analytics_assets_1(context, dlt):
    yield from dlt.run(context=context)

    # Define assets for the second Google Analytics source
    @dlt_assets(
    dlt_source=google_analytics(),
    dlt_pipeline=dlt.pipeline(
    pipeline_name="google_analytics_pipeline_2",
    destination="bigquery",
    dataset_name="google_analytics_data_2"
    ),
    group_name='Google_Analytics'
    )
    def google_analytics_assets_2(context, dlt):
    yield from dlt.run(context=context)
  • How can I use bigquery_adapter with @dlt_assets in Dagster for partitioned tables?

    To use bigquery_adapter with @dlt_assets in Dagster for partitioned tables, modify your resource setup to include bigquery_adapter with the partition parameter. Here's a quick example:

    import dlt
    from google.analytics import BetaAnalyticsDataClient
    from dlt.destinations.adapters import bigquery_adapter
    from dagster import dlt_asset

    @dlt_asset
    def google_analytics_asset(context):
    # Configuration (replace with your actual values or parameters)
    queries = [
    {"dimensions": ["dimension1"], "metrics": ["metric1"], "resource_name": "resource1"}
    ]
    property_id = "your_property_id"
    start_date = "2024-01-01"
    rows_per_page = 1000
    credentials = your_credentials

    # Initialize Google Analytics client
    client = BetaAnalyticsDataClient(credentials=credentials.to_native_credentials())

    # Fetch metadata
    metadata = get_metadata(client=client, property_id=property_id)
    resource_list = [metadata | metrics_table, metadata | dimensions_table]

    # Configure and add resources to the list
    for query in queries:
    dimensions = query["dimensions"]
    if "date" not in dimensions:
    dimensions.append("date") # type: ignore[attr-defined]

    resource_name: str = query["resource_name"] # type: ignore[assignment]
    resource_list.append(
    bigquery_adapter(
    dlt.resource(data, name=resource_name, write_disposition="append")(
    client=client,
    rows_per_page=rows_per_page,
    property_id=property_id,
    dimensions=dimensions,
    metrics=query["metrics"],
    resource_name=resource_name,
    start_date=start_date,
    last_date=dlt.sources.incremental("date"),
    ),
    partition="date"
    )
    )

    return resource_list

Additional resources

  • Check out the Dagster Cloud Documentation to learn more about deploying on Dagster Cloud.

  • Learn more about Dagster's integration with dlt: dlt & Dagster Embedded ELT Documentation.

  • A general configurable dlt resource orchestrated on Dagster: dlt resource.

  • Configure dlt pipelines for Dagster: dlt pipelines.

  • Configure MongoDB source as an Asset factory:

    Dagster provides the feature of @multi_asset declaration that will allow us to convert each collection under a database into a separate asset. This will make our pipeline easy to debug in case of failure and the collections independent of each other.

note

Some of these are external repositories and are subject to change.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.