Skip to main content

2 posts tagged with "Dagster"

View All Tags

· 12 min read
Anuun Chinbat

It's been nearly half a century since cron was first introduced, and now we have a handful orchestration tools that go way beyond just scheduling tasks. With data folks constantly debating about which tools are top-notch and which ones should leave the scene, it's like we're at a turning point in the evolution of these tools. By that I mean the term 'orchestrator' has become kind of a catch-all, and that's causing some confusion because we're using this one word to talk about a bunch of different things.

dates

Think about the word “date.” It can mean a fruit, a romantic outing, or a day on the calendar, right? We usually figure out which one it is from the context, but what does context mean when it comes to orchestration? It might sound like a simple question, but it's pretty important to get this straight.

And here's a funny thing: some people, after eating an odd-tasting date (the fruit, of course), are so put off that they naively swear off going on romantic dates altogether. It's an overly exaggerated figurative way of looking at it, but it shows how one bad experience can color our view of something completely different. That's kind of what's happening with orchestration tools. If someone had a bad time with one tool, they might be overly critical towards another, even though it might be a totally different experience.

So the context in terms of orchestration tools seems to be primarily defined by one thing - WHEN a specific tool was first introduced to the market (aside from the obvious factors like the technical background of the person discussing these tools and their tendency to be a chronic complainer 🙄).


IT'S ALL ABOUT TIMING!

evolution-of-data-orchestration

The Illegitimate Child

Cron was initially released in 1975 and is undoubtedly the father of all scheduling tools, including orchestrators, but I’m assuming Cron didn’t anticipate this many offspring in the field of data (or perhaps it did). As Oracle brought the first commercial relational database to market in 1979, people started to realize that data needs to be moved on schedule, and without manual effort. And it was doable, with the help of Control-M, though it was more of a general workflow automation tool that didn’t pay special attention to data workflows.

Basically, since the solutions weren’t data driven at that time, it was more “The job gets done, but without a guarantee of data quality.”

Finally Adopted

Unlike Control-M, Informatica was designed for data operations in mind from the beginning. As data started to spread across entire companies, advanced OLAPs started to emerge with a broad use of datawarehousing. Now data not only needed to be moved, but integrated across many systems and users. The data orchestration solution from Informatica was inevitably influenced by the rising popularity of the contemporary drag-and-drop concept, that is, to the detriment of many modern data engineers who would recommend to skip Informatica and other GUI based ETL tools that offer ‘visual programming’.

As the creator of Airflow, Max Beauchemin, said: “There's a multitude of reasons why complex pieces of software are not developed using drag and drop tools: it's that ultimately code is the best abstraction there is for software.

To Be Free, That Is, Diverse

With traditional ETL tools, such as IBM DataStage and Talend, becoming well-established in the 1990s and early 2000s, the big data movement started gaining its momentum with Hadoop as the main star. Oozie, later made open-source in 2011, was tasked with workflow scheduling of Hadoop jobs, with closed-source solutions, like K2View starting to operate behind the curtains.

Fast forward a bit, and the scene exploded, with Airflow quickly becoming the heavyweight champ, while every big data service out there began rolling out their own orchestrators. This burst brought diversity, but with diversity came a maze of complexity. All of a sudden, there’s an orchestrator for everyone — whether you’re chasing features or just trying to make your budget work 👀 — picking the perfect one for your needs has gotten even trickier.

types

The Bottom Line

The thing is that every tool out there has some inconvenient truths, and real question isn't about escaping the headache — it's about choosing your type of headache. Hence, the endless sea of “versus” articles, blog posts, and guides trying to help you pick your personal battle.

A Redditor: “Everyone has hated all orchestration tools for all time. People just hated Airflow less and it took off.“

What I'm getting at is this: we're all a bit biased by the "law of the instrument." You know, the whole “If all you have is a hammer, everything looks like a nail” thing. Most engineers probably grabbed the latest or most hyped tool when they first dipped their toes into data orchestration and have stuck with it ever since. Sure, Airflow is the belle of the ball for the community, but there's a whole lineup of contenders vying for the spotlight.

law-of-instrument

And there are obviously those who would relate to the following:

reddit-screenshot


A HANDY DETOUR POUR TOI 💐

The Fundamentals

About Airflow

Miscellaneous


WHAT THE FUTURE HOLDS...

I'm no oracle or tech guru, but it's pretty obvious that at their core, most data orchestration tools are pretty similar. They're like building blocks that can be put together in different ways—some features come, some go, and users are always learning something new or dropping something old. So, what's really going to make a difference down the line is NOT just about having the coolest features. It's more about having a strong community that's all in on making the product better, a welcoming onboarding process that doesn't feel like rocket science, and finding that sweet spot between making things simple to use and letting users tweak things just the way they like.

In other words, it's not just about what the tools can do, but how people feel about using them, learning them, contributing to them, and obviously how much they spend to maintain them. That's likely where the future winners in the data orchestration game will stand out. But don’t get me wrong, features are important — it's just that there are other things equally important.


I’ve been working on this article for a WHILE now, and, honestly, it's been a bit of a headache trying to gather any solid, objective info on which data orchestration tool tops the charts. The more I think about it, the more I realise it's probably because trying to measure "the best" or "most popular" is a bit like trying to catch smoke with your bare hands — pretty subjective by nature. Plus, only testing them with non-production level data probably wasn't my brightest move.

However, I did create a fun little project where I analysed the sentiment of comments on articles about selected data orchestrators on Hacker News and gathered Google Trends data for the past year.

Just a heads-up, though: the results are BY NO MEANS reliable and are skewed due to some fun with words. For instance, searching for “Prefect” kept leading me to articles about Japanese prefectures, “Keboola” resulted in Kool-Aid content, and “Luigi”... well, let’s just say I ran into Mario’s brother more than once 😂.


THE FUN LITTLE PROJECT

Straight to the GitHub repo.

I used Dagster and dlt to load data into Snowflake, and since both of them have integrations with Snowflake, it was easy to set things up and have them all running:

Pipeline overview

This project is very minimal, including just what's needed to run Dagster locally with dlt. Here's a quick breakdown of the repo’s structure:

  1. .dlt: Utilized by the dlt library for storing configuration and sensitive information. The Dagster project is set up to fetch secret values from this directory as well.
  2. charts: Used to store chart images generated by assets.
  3. dlt_dagster_snowflake_demo: Your Dagster package, comprising Dagster assets, dlt resources, Dagster resources, and general project configurations.

Dagster Resources Explained

In the resources folder, the following two Dagster resources are defined as classes:

  1. DltPipeline: This is our dlt object defined as a Dagster ConfigurableResource that creates and runs a dlt pipeline with the specified data and table name. It will later be used in our Dagster assets to load data into Snowflake.

    class DltPipeline(ConfigurableResource):
    # Initialize resource with pipeline details
    pipeline_name: str
    dataset_name: str
    destination: str

    def create_pipeline(self, resource_data, table_name):
    """
    Creates and runs a dlt pipeline with specified data and table name.

    Args:
    resource_data: The data to be processed by the pipeline.
    table_name: The name of the table where data will be loaded.

    Returns:
    The result of the pipeline execution.
    """

    # Configure the dlt pipeline with your destination details
    pipeline = dlt.pipeline(
    pipeline_name=self.pipeline_name,
    destination=self.destination,
    dataset_name=self.dataset_name
    )

    # Run the pipeline with your parameters
    load_info = pipeline.run(resource_data, table_name=table_name)
    return load_info
  2. LocalFileStorage: Manages the local file storage, ensuring the storage directory exists and allowing data to be written to files within it. It will be later used in our Dagster assets to save images into the charts folder.

dlt Explained

In the dlt folder within dlt_dagster_snowflake_demo, necessary dlt resources and sources are defined. Below is a visual representation illustrating the functionality of dlt:

dlt explained

  1. hacker_news: A dlt resource that yields stories related to specified orchestration tools from Hackernews. For each tool, it retrieves the top 5 stories that have at least one comment. The stories are then appended to the existing data.

    Note that the write_disposition can also be set to merge or replace:

    • The merge write disposition merges the new data from the resource with the existing data at the destination. It requires a primary_key to be specified for the resource. More details can be found here.
    • The replace write disposition replaces the data in the destination with the data from the resource. It deletes all the classes and objects and recreates the schema before loading the data.

    More details can be found here.

  2. comments: A dlt transformer - a resource that receives data from another resource. It fetches comments for each story yielded by the hacker_news function.

  3. hacker_news_full: A dlt source that extracts data from the source location using one or more resource components, such as hacker_news and comments. To illustrate, if the source is a database, a resource corresponds to a table within that database.

  4. google_trends: A dlt resource that fetches Google Trends data for specified orchestration tools. It attempts to retrieve the data multiple times in case of failures or empty responses. The retrieved data is then appended to the existing data.

As you may have noticed, the dlt library is designed to handle the unnesting of data internally. When you retrieve data from APIs like Hacker News or Google Trends, dlt automatically unpacks the nested structures into relational tables, creating and linking child and parent tables. This is achieved through unique identifiers (_dlt_id and _dlt_parent_id) that link child tables to specific rows in the parent table. However, it's important to note that you have control over how this unnesting is done.

The Results

Alright, so once you've got your Dagster assets all materialized and data loaded into Snowflake, let's take a peek at what you might see:

sentiment counts

I understand if you're scratching your head at first glance, but let me clear things up. Remember those sneaky issues I mentioned with Keboola and Luigi earlier? Well, I've masked their charts with the respective “culprits”.

Now, onto the bars. Each trio of bars illustrates the count of negative, neutral, and positive comments on articles sourced from Hacker News that have at least one comment and were returned when searched for a specific orchestration tool, categorized accordingly by the specific data orchestration tool.

What's the big reveal? It seems like Hacker News readers tend to spread more positivity than negativity, though neutral comments hold their ground.

And, as is often the case with utilizing LLMs, this data should be taken with a grain of salt. It's more of a whimsical exploration than a rigorous analysis. However, if you take a peek behind Kool Aid and Luigi, it's intriguing to note that articles related to them seem to attract a disproportionate amount of negativity. 😂


IF YOU'RE STILL HERE

… and you're just dipping your toes into the world of data orchestration, don’t sweat it. It's totally normal if it doesn't immediately click for you. For beginners, it can be tricky to grasp because in small projects, there isn't always that immediate need for things to happen "automatically" - you build your pipeline, run it once, and then bask in the satisfaction of your results - just like I did in my project. However, if you start playing around with one of these tools now, it could make it much easier to work with them later on. So, don't hesitate to dive in and experiment!

… And hey, if you're a seasoned pro about to drop some knowledge bombs, feel free to go for it - because what doesn’t challenge us, doesn’t change us 🥹. (*Cries in Gen Z*)

· 11 min read
Zaeem Athar
info

TL;DR: In this blog post, we'll build data piplines using dlt and orchestrate them using Dagster.

dlt is an open-source Python library that allows you to declaratively load messy data sources into well-structured tables or datasets, through automatic schema inference and evolution. It simplifies building data pipelines by providing functionality to support the entire extract and load process.

It does so in a scalable way, enabling you to run it on both micro workers or in highly parallelized setups. dlt also offers robustness on extraction by providing state management for incremental extraction, drop-in requests replacement with retries, and many other helpers for common and uncommon extraction cases.

To start with dlt, you can install it using pip: pip install dlt. Afterward, import dlt in your Python script and start building your data pipeline. There's no need to start any backends or containers.

Project Overview:

In this example, we will ingest GitHub issue data from a repository and store the data in BigQuery. We will use dlt to create a data pipeline and orchestrate it using Dagster.

Initially, we will start by creating a simple data pipeline using dlt. We will then orchestrate the pipeline using Dagster. Finally, we will add more features to this pipeline by using the dlt schema evolution and Dagster asset metadata to educate the users about their data pipeline.

The project code is available on GitHub.

Project Overview

As we will be ingesting data into BigQuery we first need to create service account credentials for BigQuery. You can find more info on setting up a service account in the dlt docs.

Once we have the credentials we are ready to begin. Let’s first install Dagster and dlt. The below commands should install both.

pip install dlt
pip install dagster dagster-webserver

Simple dlt Pipeline:

As a first step, we will create the GitHub issues pipeline using dlt.

dlt init github_issues bigquery

This will generate a template for us to create a new pipeline. Under .dlt/secrets.toml add the service account credentials for BigQuery. Then in the github_issues.py delete the generated code and add the following:

@dlt.resource(write_disposition="append")
def github_issues_resource(api_secret_key=dlt.secrets.value):
owner = 'dlt-hub'
repo = 'dlt'
url = f"https://api.github.com/repos/{owner}/{repo}/issues"
headers = {"Accept": "application/vnd.github.raw+json"}

while url:
response = requests.get(url, headers=headers)
response.raise_for_status() # raise exception if invalid response
issues = response.json()
yield issues

if 'link' in response.headers:
if 'rel="next"' not in response.headers['link']:
break

url = response.links['next']['url'] # fetch next page of stargazers
else:
break
time.sleep(2) # sleep for 2 seconds to respect rate limits

if __name__ == "__main__":
# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name='github_issues', destination='bigquery', dataset_name='github_issues_data'
)

# run the pipeline with your parameters
load_info = pipeline.run(github_issues_resource())

#print the information on data that was loaded
print(load_info)

The above code creates a simple github_issues pipeline that gets the issues data from the defined repository and loads it into BigQuery. The dlt.resources yields the data while the dlt.pipeline normalizes the nested data and loads it into the defined destination. To read more about the technical details refer to the dlt docs.

To run the pipeline execute the below commands:

pip install -r requirements.txt
python github_issues.py

We now have a running pipeline and are ready to orchestrate it using Dagster.

Orchestrating using Dagster:

We will need to adjust our pipeline a bit to orchestrate it using Dagster.

Step 1: Create a Dagster project

  • Create a new directory for your Dagster project and scaffold the basic structure:
mkdir dagster_github_issues
cd dagster_github_issues
dagster project scaffold --name github-issues

This will generate the default files for Dagster that we will use as a starting point for our data pipeline.

Step 2: Set up the directory structure

  • Inside the github-issues/github_issues directory create the following folders: assets, resources, and dlt.
.
├── README.md
├── github_issues
│ ├── __init__.py
│ ├── assets
│ │ ├── __init__.py
│ ├── dlt
│ │ ├── __init__.py
│ └── resources
│ ├── __init__.py
├── github_issues_tests
│ ├── __init__.py
│ └── test_assets.py
├── pyproject.toml
├── setup.cfg
└── setup.py

Step 3: Add dlt Resources and environment variables

  • Copy the previously created github_issues_resource code into dlt/__init__.py under the dlt folder. Remove the dlt.secrets.value parameter, as we'll pass the credentials through a .env file.
  • Create a .env file in the root directory. This is the directory where the pyproject.toml file exits. Copy the credentials into the .env and follow the correct naming convention. For more info on setting up the .env file have a look at the docs.

Step 4: Add configurable resources and define the asset

  • Define a DDltResource class in resources/__init__.py as a Dagster configurable resource. This class allows you to reuse pipeline code inside an asset.
from dagster import ConfigurableResource 
import dlt

class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str

def create_pipeline(self, resource_data, table_name):

# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name=self.pipeline_name, destination=self.destination, dataset_name=self.dataset_name
)

# run the pipeline with your parameters
load_info = pipeline.run(dlt_resource, table_name=table_name)

return load_info
  • Define the asset, issues_pipeline, in assets/__init__.py. This asset uses the configurable resource to create a dlt pipeline and ingests data into BigQuery.
from dagster import asset, get_dagster_logger
from ..resources import DDltResource
from ..dlt import github_issues_resource

@asset
def issues_pipeline(pipeline: DDltResource):

logger = get_dagster_logger()
results = pipeline.create_pipeline(github_issues_resource, table_name='github_issues')
logger.info(results)

The defined asset (issues_pipeline) takes as input the configurable resource (DDltResource). In the asset, we use the configurable resource to create a dlt pipeline by using an instance of the configurable resource (DDltResource) to call the create_pipeline function. The dlt.resource (github_issues_resource) is passed to the create_pipeline function. The create_pipeline function normalizes the data and ingests it into BigQuery.

Step 5: Handle Schema Evolution

dlt provides the feature of schema evolution that monitors changes in the defined table schema. Suppose GitHub adds a new column or changes a datatype of a column this small change can break pipelines and transformations. The schema evolution feature works amazingly well with Dagster.

  • Add the schema evolution code to the asset to make our pipelines more resilient to changes.
from dagster import AssetExecutionContext
@asset
def issues_pipeline(context: AssetExecutionContext, pipeline: DDltResource):
...
md_content=""
for package in result.load_packages:
for table_name, table in package.schema_update.items():
for column_name, column in table["columns"].items():
md_content= f"\tTable updated: {table_name}: Column changed: {column_name}: {column['data_type']}"

# Attach the Markdown content as metadata to the asset
context.add_output_metadata(metadata={"Updates": MetadataValue.md(md_content)})

Step 6: Define Definitions

  • In the __init.py__ under the github_issues folder add the definitions:
all_assets = load_assets_from_modules([assets])
simple_pipeline = define_asset_job(name="simple_pipeline", selection= ['issues_pipeline'])

defs = Definitions(
assets=all_assets,
jobs=[simple_pipeline],
resources={
"pipeline": DDltResource(
pipeline_name = "github_issues",
dataset_name = "dagster_github_issues",
destination = "bigquery",
table_name= "github_issues"
),
}
)

Step 7: Run the Web Server and materialize the asset

  • In the root directory (github-issues) run the dagster dev command to run the web server and materialize the asset.

GitHub Asset

Step 8: View the populated Metadata and ingested data in BigQuery

Once the asset has been successfully materialized go to the Assets tab from the top and select the Issues_pipeline. In the Metadata you can see the Tables, Columns, and Data Types that have been updated. In this case, the changes are related to internal dlt tables.

Any subsequent changes in the GitHub issues schema can be tracked from the metadata. You can set up Slack notifications to be alerted to schema changes.

Meatadata loaded in Asset

Let's finally have a look in BigQuery to view the ingested data.

Data Loaded in Bigquery

The github_issues is the parent table that contains the data from the root level of the JSON returned by the GitHub API. The subsequent table github_issues_assignees is a child table that was nested in the original JSON. dlt normalizes nested data by populating them in separate tables and creates relationships between the tables. To learn more about how dlt created these relationships refer to the docs.

Orchestrating verified dlt source using Dagster:

dlt provides a list of verified sources that can be initialized to fast-track the pipeline-building process. You can find a list of sources provided in the dlt docs.

One of the main strengths of dlt lies in its ability to extract, normalize, and ingest unstructured and semi-structured data from various sources. One of the most commonly used verified source is MongoDB. Let’s quickly look at how we can orchestrate MongoDB source using Dagster.

Step 1: Setting up a Dagster project

  • Start by creating a new Dagster project scaffold:
dagster project scaffold --name mongodb-dlt
  • Follow the steps mentioned earlier and create an assets, and resources directory under mongodb-dlt/mongodb_dlt.
  • Initialize a dlt MongoDB pipeline in the same directory:
dlt init mongodb bigquery

This will create a template with all the necessary logic implemented for extracting data from MongoDB. After running the command your directory structure should be as follows:

.
├── README.md
├── mongodb_dlt
│ ├── __init__.py
│ ├── assets
│ │ ├── __init__.py
│ │ └── assets.py
│ ├── mongodb
│ │ ├── README.md
│ │ ├── __init__.py
│ │ └── helpers.py
│ ├── mongodb_pipeline.py
│ ├── requirements.txt
│ └── resources
│ ├── __init__.py
├── mongodb_dlt_tests
│ ├── __init__.py
│ └── test_assets.py
├── pyproject.toml
├── setup.cfg
└── setup.py

Step 2: Configuring MongoDB Atlas and Credentials

For this example, we are using MongoDB Atlas. Set up the account for MongoDB Atlas and use the test Movie Flix Dataset. You can find detailed information on setting up the credentials in the MongoDB verified sources documentation.

Next, create a .env file and add the BigQuery and MongoDB credentials to the file. The .env file should reside in the root directory.

Step 3: Adding the DDltResource

Create a DltResouce under the resources directory. Add the following code to the __init__.py:

from dagster import ConfigurableResource 

import dlt

class DDltResource(ConfigurableResource):
pipeline_name: str
dataset_name: str
destination: str

def load_collection(self, resource_data, database):

# configure the pipeline with your destination details
pipeline = dlt.pipeline(
pipeline_name=f"{database}_{self.pipeline_name}", destination=self.destination, dataset_name=f"{self.dataset_name}_{database}"
)

load_info = pipeline.run(resource_data, write_disposition="replace")

return load_info

Step 4: Defining an Asset Factory

The structure of data in MongoDB is such that under each database you will find multiple collections. When writing a data pipeline it is important to separate the data loading for each collection.

Dagster provides the feature of @multi_asset declaration that will allow us to convert each collection under a database into a separate asset. This will make our pipeline easy to debug in case of failure and the collections independent of each other.

In the mongodb_pipeline.py file, locate the load_select_collection_hint_db function. We will use this function to create the asset factory.

In the __init__.py file under the assets directory, define the dlt_asset_factory:

from ..mongodb import mongodb
from ..resources import DDltResource

import dlt
import os

URL = os.getenv('SOURCES__MONGODB__CONNECTION__URL')

DATABASE_COLLECTIONS = {
"sample_mflix": [
"comments",
"embedded_movies",
],
}

def dlt_asset_factory(collection_list):
multi_assets = []

for db, collection_name in collection_list.items():
@multi_asset(
name=db,
group_name=db,
outs={
stream: AssetOut(key_prefix=[f'raw_{db}'])
for stream in collection_name}

)
def collections_asset(context: OpExecutionContext, pipeline: DDltResource):

# Getting Data From MongoDB
data = mongodb(URL, db).with_resources(*collection_name)

logger = get_dagster_logger()
results = pipeline.load_collection(data, db)
logger.info(results)

return tuple([None for _ in context.selected_output_names])

multi_assets.append(collections_asset)

return multi_assets


dlt_assets = dlt_asset_factory(DATABASE_COLLECTIONS)

Step 5: Definitions and Running the Web Server

Add the definitions in the __init__.py in the root directory:

from dagster import Definitions

from .assets import dlt_assets
from .resources import DDltResource

defs = Definitions(
assets=dlt_assets,
resources={
"pipeline": DDltResource(
pipeline_name = "mongo",
dataset_name = "dagster_mongo",
destination = "bigquery"
),
}
)

We can run the dagster dev command to start the web server. We can see that each collection is converted into a separate asset by Dagster. We can materialize our assets to ingest the data into BigQuery.

Asset Factory

The resulting data in BigQuery:

Data Ingestion in BigQuery from MongoDB

Conclusion:

In this demo, we looked at how to orchestrate dlt pipelines using Dagster. We started off by creating a simple dlt pipeline and then converted the pipeline into an asset and resource before orchestrating.

We also looked at how we can orchestrate dlt MongoDB verified sources using Dagster. We utilized the Dagster @multi_asset feature to create a dlt_asset_factory which converts each collection under a database to a separate asset allowing us to create more robust data pipelines.

Both dlt and Dagster can be easily run on local machines. By combining the two we can build data pipelines at great speed and rigorously test them before shipping to production.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.