Skip to main content

Building resilient pipelines in minutes with dlt + Prefect

· 7 min read
Dylan Hughes & Chris Reuter

This article is reposted from Prefect.io blog, and you can read the original there.

The hardest part about writing a blog is getting started - writing the outline and filling out the first few key points. The same can be said for writing data pipelines: you need to inspect docs, determine data structures, write tests, etc.

What if you could build a resilient, production-ready data pipeline that is scheduled and running in just a few minutes? We’ll show you how to do just that with dlt and Prefect.

dlt

dlt is an open-source library that you can add to your Python scripts to load data from various and often messy data sources into well-structured, live datasets. It abstracts away the need to hunt through docs, interpret APIs, and reinvent the wheel every time. Instead of writing a custom pipeline, you can use dlt to build a framework for your pipelines for any combination of tools.

Moving Slack data into BigQuery

We use BigQuery as our data warehouse, and try to centralize as much information there as possible. Given our Slack community is over 25,000 people, it makes sense to use that information to better our community. We can identify the types of questions our users struggle with the most, and take action to improve Prefect by using Slack data.

If you Google “load Slack into BigQuery,” you’ll see a bunch of listings for no-code tools like Zapier that can help you move data… for a fee, of course. What if you want to do this yourself? Slack has an API, but check it out. It would take some effort to interpret even a simple response like this one for users:

{
"ok": true,
"members": [
{
"id": "W012A3CDE",
"team_id": "T012AB3C4",
"name": "spengler",
"deleted": false,
"color": "9f69e7",
"real_name": "spengler",
"tz": "America/Los_Angeles",
"tz_label": "Pacific Daylight Time",
"tz_offset": -25200,
"profile": {
"avatar_hash": "ge3b51ca72de",
"status_text": "Print is dead",
"status_emoji": ":books:",
"real_name": "Egon Spengler",
"display_name": "spengler",
"real_name_normalized": "Egon Spengler",
"display_name_normalized": "spengler",
"email": "spengler@ghostbusters.example.com",
"image_24": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
"image_32": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
"image_48": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
"image_72": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
"image_192": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
"image_512": "https://.../avatar/e3b51ca72dee4ef87916ae2b9240df50.jpg",
"team": "T012AB3C4"
},
"is_admin": true,
"is_owner": false,
"is_primary_owner": false,
"is_restricted": false,
"is_ultra_restricted": false,
"is_bot": false,
"updated": 1502138686,
"is_app_user": false,
"has_2fa": false
}
]
}

With dlt

You can use dlt to build a Slack to BigQuery pipeline in just a few seconds with a single command. Seriously, it is that simple. In preparation, let’s make sure to install what we need:

pip install dlt
pip install prefect

Then just run a simple init command:


dlt init slack bigquery

In the .dlt/secrets.toml file, enter your Slack and BigQuery credentials:

[sources.slack]
access_token="*****"

[destinations.bigquery]
location = "US"

[destination.bigquery.credentials]
project_id = "*****"
private_key = "*****"
client_email = "*****"

With a single command + adding some credentials, we now have the framework of a pipeline! Look at what has been generated, with a couple of small customizations:

Note that we are redacting some of the code in the preview for brevity, to follow along completely navigate to the repo.

# Pipeline to load Slack into BigQuery

from typing import List

import dlt
import pendulum

from slack import slack_source

def load_channels() -> None:
"""Execute a pipeline that will load a list of all the Slack channels in the workspace to BigQuery"""
# ...

def get_resources() -> List[str]:
"""Fetch a list of available dlt resources so we can fetch them one at a time"""
# ...

def load_channel_history(channel: str, start_date: Date) -> None:
"""Execute a pipeline that will load the given Slack channel incrementally beginning at the given start date."""
# ...

def get_users() -> None:
"""Execute a pipeline that will load Slack users list."""
# ...

if __name__ == "__main__":
channels = None
start_date = pendulum.now().subtract(days=1).date()

load_channels()

resources = get_resources()
for resource in resources:
if channels is not None and resource not in channels:
continue

load_channel_history(resource, start_date=start_date)

get_users()

What if it fails?

Great, we’ve got a pipeline that moves data from Slack to BigQuery, and we didn’t have to format any JSON - that alone is a win. However, there may be some issues. What if Slack rate limits you? What if BigQuery is down (😅)? What about a networking issue? What if the execution environment where this script lives isn’t working?

These questions are the difference between a pipeline and a resilient pipeline. They’re the difference between you getting sleep at night and you looking like a hero (or a dummy) to your stakeholders.

Adding Prefect

Prefect is a workflow orchestration tool for turning your pipelines into scheduled, repeatable, and resilient workflows. With Prefect you get scheduling, observability, and automations that can make sure your pipelines aren’t causing you stress in the middle of the night.

Make sure you’re logged in to Prefect Cloud by signing up and using the following command:

prefect cloud login

Luckily, Prefect is also incredibly Pythonic. Turning any pipeline into an observable, scheduled Prefect flow is as simple as adding decorators to your functions and serving it up. Here’s our dlt generated pipeline, scheduled daily:

from typing import List

import dlt
import pendulum
from prefect import flow, task
from slack import slack_source

@task
def load_channels() -> None:
...

@task
def get_resources() -> List[str]:
...

@task
def load_channel_history(channel: str, start_date: pendulum.Date) -> None:
...

@task
def get_users() -> None:
...

@flow
def slack_pipeline(
channels=None, start_date=pendulum.now().subtract(days=1).date()
) -> None:
load_channels()

resources = get_resources()
for resource in resources:
if channels is not None and resource not in channels:
continue

load_channel_history(resource, start_date=start_date)

get_users()

if __name__ == "__main__":
slack_pipeline.serve("slack_pipeline", cron="0 0 * * *")

We’ve added @task to our individual functions. These will be treated as individual units of work by Prefect when they are executed. We decorate our primary function (slack_pipeline) with @flow, which references our task functions. We will schedule and kick off flows, which in turn will execute tasks based on the decorators within them.

Finally, adding .serve to our if __name__ == "__main__": call means that a Prefect deployment will be automatically created and scheduled to run daily at noon. We can see our deployment and scheduled runs in the Prefect UI, and we’ll know when it ran or, more importantly, if they didn't. We can further extend our pipeline by:

Where to handle failure

There are many levels of failure, you could say, from "accidentally liking your ex's social media post from five years ago" to "trying to assemble IKEA furniture without instructions," up to "asking for the Wi-Fi password at a funeral." So which ones should we handle where, and what are some quick solutions?

With dlt, your pipelines are resilient at the API level. From schema changes to network issues or memory overflow, there is automated resiliency and recovery that is specific to working with the pesky APIs of your tools.

With Prefect, your pipelines become resilient at the function level. If your workflows never run, break and fail, or break and never end, Prefect will be your backstop - notifying you and taking the appropriate action in case of failure.

Building resilient pipelines faster with dlt + Prefect

Getting into production is hard. First you need to build your pipeline, and then you need to make it resilient. With this tutorial, we’ve shown you how to quickly build pipelines with dlt and then turn that pipeline into a resilient, repeatable workflow with Prefect.

Prefect makes complex workflows simpler, not harder. Try Prefect Cloud for free for yourself, download our open source package, join our Slack community, or talk to one of our engineers to learn more.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.