dltHub
Blog /

How to run dlt with Airflow (Or any other Python thing)

  • Francesco Mucio,
    Untitled Data Company

Previously on “How to run dlt”:

dlt is a Python library that simplifies extracting and loading data, from a source (such as a database, REST API, or file system) to a destination(like a database or file system).

So, how can you actually run dlt?

Airflow requires a dedicated article, because, off the top of my head, you can run dlt (or any other Python code) in at least three different ways
This piece was first published over on Substack by our consulting partners Untitled data company.

The past few weeks have been challenging, with colds and viruses making their way from my kindergartener to the rest of the family. I initially planned to include coding examples for every method to run DLT in Airflow, but that would have taken far too much time. Since I was already committed to continuing this series, I powered through to get it out there.

If you are here, you probably know Apache Airflow and I don’t need to explain to you how to write a DAG or what is an operator (in case you need a quick review).

Before we dive into the various ways to run dlt, keep in mind that these methods can be used to run any Python code in Airflow. Here are your options:

  • PythonOperator
  • PythonVirtualenvOperator
  • KubernetedPodOperator
  • Exteranal Services

Using the PythonOperator

The PythonOperator allows you to execute arbitrary code with Airflow.

If you have a dlt pipeline defined in a method dlt_pipeline_run(), you can simply create a task like this:

run_dlt = PythonOperator(
    task_id="run_the_dlt_pipeline",
    python_callable=dlt_pipeline_run,
)

Or, if you like decorators, you can just decorate the method like so:

@task(task_id="run_dlt")
def dlt_pipeline_run(**kwargs):
  ...

rund_dlt = dlt_pipeline_run()

If you try to run a dlt pipeline with a vanilla installation of Airflow, you will get an error because Airflow won’t recognize the module dlt.

On my local machine, I would just do a pip install dlt or even better uv add dlt (uv is pretty cool). Depending on your Airflow setup, you have different ways to install it. This is going to be a bit too long, so I just put some additional info in this footnote1.

If you manage to get it running, congratulations, you just discovered the worst way to use Airflow!

How Airflow works

Airflow has multiple components: a webserver, a scheduler, and one or more workers. The webserver provides the user interface, the scheduler determines when to run your DAG and dispatches tasks to the workers, and the workers execute your DAG’s tasks.

When all these components run on the same server, they fight for CPU and RAM resources.

For this reason, it’s best practice to host these components on different machines: a setup that requires a bit more expertise and configuration. Luckily, there is an abstraction that makes trivial to run multiple machines and have a proper setup for Airflow: Kubernetes (K8S from now on). More on this later.

Executing dlt

Going back to dlt, when using the PythonOperator an Airflow worker executes our pipeline. However, with this setup there are three problems:

  1. Module Conflicts:
    The dlt library must be installed on every worker, which could lead to conflicts with other Python modules.
  2. Resource Contention:
    In a single-machine setup, your pipeline might consume resources needed by the scheduler or the webserver (this doesn’t apply if your Airflow is on K8S because each worker has its own pod/resources)
  3. Tight Coupling:
    The pipeline code is often intertwined with the scheduling code, making it more difficult to test and maintain the pipeline independently.

✅ It works

❌ You need to install dlt in Airflow
❌ If you have Airflow in a box, it can consume common resources
❌ Possible module conflicts
❌ Scheduling code and loading code in the same place = additional complication for testing

Using the PythonVirtualenvOperator

Recently, I had to run dlt using GCP Composer managed by another team. The team wasn’t very strong on infra topics, and I needed a quick solution to demonstrate the power of dlt.

If you read the title of this section, you know were this is going. The PythonVirtualenvOperator lets you run Python in an environment different from the default one - an environment where it is possible to install additional dependencies (which I needed) or avoid conflicts (not my case, but good too).

The code is very similar to the previous one:

run_dlt = PythonVirtualenvOperator(
    task_id="run_the_dlt_pipeline",
    requirements=["dlt==1.8.0"],
    python_callable=dlt_pipeline_run,
)

or

@task.virtualenv(task_id="run_dlt", requirements=["dlt==1.8.0"])
def dlt_pipeline_run(**kwargs):
  ...

rund_dlt = dlt_pipeline_run()

When scheduled, the worker sets up a new Python virtual environment, installs dlt (or whatever is in the requirements), and runs the pipeline without problems. Once completes, the virtual environment is thrown away.

A couple of things to add here:

  • If you need to install multiple modules, the virtual environment setup can take some time. Caching virtual environments saves time.
  • While this method avoids module conflicts, but resource issues can still arise.
  • The callable you pass to the PythonVirtualenvOperator cannot call other user defined methods. If you need to do that you will need to wrap everything in a single callable, which is not always an option, or find some other solutions.

The final issue was quite frustrating since I prefer to keep my methods concise (ideally, a method should perform a single task). Stashing everything together was painful.

In summary:

✅ It works
✅ No module conflicts

❌ If you have Airflow in a box, it can consume common resources
❌ Only the selected callable will be passed to the Virtual Env
❌ Scheduling code and loading code in the same place = additional complication for testing

Using the KubernetedPodOperator

Finally, we arrived to the part that I can wholeheartedly recommend. I love the KubernetedPodOperator because with it you are just telling Airflow “ask K8S to spin up a pod for you to run this thing, you just relax”.

How does it actually work:

run_dlt = KubernetesPodOperator(
    task_id="run_dlt_pipeline",
    name="dlt-pipeline",
    image="my-custom-dlt-image:latest",  # A Docker image pre-built with dlt and all dependencies
    cmds=["python", "dlt_pipeline.py"],
)

Airflow instantiates a new pod with the image my-custom-dlt-image:latest, where we have python, dlt, and our pipeline. Then the command python dlt_pipeline.py is executed.

It is really that simple, but you need a few more things to feel truly comfortable with the KuberneterPodOperator. After all, as the old saying goes “It takes a village to run on Kubernetes”.

Airflow on K8S - Ingredients

What you need to know:

  • Linux command line:
    When starting replacing the commands with ls or env is a nice debugging trick.
  • Docker:
    If you know how to write and test locally a dockerfile you are almost there.
  • CICD:
    Pushing a Docker image to a repository, passing secrets, chain cicd pipelines.

And what you need to have:

  • An Airflow running on K8S or with access to a K8S cluster
  • A Docker container registry for storing your images.
  • A CI/CD tool—because doing these processes manually is just asking for trouble.

Usually all this works better when your organization already has an infrastructure team managing K8S and available to assist you.

I do not recommend to run and manage a K8S cluster if you are new to Airflow, if you don’t know what you are doing, or short in time. That said, K8S has been around for so long that most LLMs can help you to troubleshoot problems.

If you are still here and you are not scared, you will probably have the experience of your life using the KubernetesPodOperator. You will be able to test each part of your pipeline independently, because there is a clear separation between the scheduling code (your DAG) and your execution code (the dlt pipeline in the docker image).

Of course, that is until you ask, “How do I run this locally?” Then…

In summary:

✅ Decouples scheduling from execution: easier testing, leaner DAGs
✅ No module conflicts
✅ No resource conflicts
✅ Cheapest way to run Airflow

❌ Requires many different skills in the team
❌ Someone as to maintain the K8S cluster for deployment
❌ Setting up a dev environment can be troublesome

Let me add a personal note after the summary. I love running Airflow on Kubernetes, though I've scared more than one person by saying so or suggesting they try it. When those who have successfully deployed Airflow on K8S meet, they immediately know they’re talking with someone who understands the good stuff. Or this is what I like to believe…

There is just so much satisfaction in having the scheduling logic separated from the business logic. Do you I need to run another tool or process? Sure, let’s build a new container. Do I need to change the schedule or the tasks dependencies? Sure, here is the PR, my DAG is super lean, and I have already done the change.

Airflow essentially becomes an abstraction layer on top of Kubernetes for batch executions.

But underneath there is still K8S. I once built a couple of DAGs as a proof of concept for a client, and some runs failed without clear explanations. Although I didn’t have access to their Kubernetes cluster, I knew that some configurations in both Airflow and K8S were off. I just didn’t have the time to support their infra team in fixing them.

External Services

In case you don’t have K8S, but you still want to decouple Airflow scheduling from the execution environment, leveraging external compute services is a powerful strategy.

Whether you’re using AWS, GCP, Azure, or another cloud provider, many platforms offer managed services that run your containerized or serverless code without the need to maintain underlying infrastructure.

  • Serverless Functions:
    These services (such as AWS Lambda, GCP Cloud Functions, or Azure Functions) allow you to run lightweight code in response to events. They are typically best for short-lived tasks due to their runtime limits.
  • Managed Container Services:
    For tasks that exceed typical serverless function timeouts or require more resources, the answer is managed container services (like AWS Fargate, GCP Cloud Run, or Azure Container Instances).

Both Serverless Functions and Container Services can run Docker containers. This not only keeps your scheduling and execution code separated but also gives you the flexibility to migrate between services (e.g., from AWS Lambda to Fargate or to K8S) or even between clouds.

One thing to mention is that you will need to be able to debug and monitor these external services. While this can be challenging if you’re not a cloud expert, it also provides an opportunity to expand your toolset (tools that can be useful with other schedulers as well).

I am not going to write down examples for each of the services, I leave them as homework. You can find examples in the documentation of the Airflow Operators of each cloud provider. Except for Azure, which has a very limited set of Airflow operators2.

In summary:

✅ Decouples scheduling from execution: easier testing, leaner DAGs
✅ No module conflicts
✅ No resource conflicts
✅ No K8S, but it can cost more.

❌ Requires additional cloud skills in the team
❌ Some cloud vendors have better support than others
❌ Multiple services to keep track

Conclusion

And we are at the end of this article. I am sure that as soon as I click publish, a couple of additional methods will come to my mind. If you are still wondering, my favorite method is using the KubernetedPodOperator, because everything happens within the perimeter of a K8S cluster. However, I also recognize the challenges of using it and the advantages of the other approaches.

To make it a bit a more compact, I summarized the different approaches in the below table. Feel free to share it or to share this article

Table: Francesco Mucio, Source: Untitled Data Company, Get the data, Created with Datawrapper

Beside writing posts on this select * from substack, I also work as data consultant for my own company. It was an excuse to work with people that I like and get some more interesting projects. If you have a data challenge, if your delivery speed is too slow, or you if think you can do more with data. Feel free to contact us.

It was a long post, if you got here, this is for you.

1 Ok, I cannot leave you like that, so:

  • if you are running Airflow with docker compose:
    • for development or quick checks you can add additional packages using the variable _PIP_ADDITIONAL_REQUIREMENTS in your docker_compose.yaml (see docs)
    • for production, the recommended approach is to create your own Airflow image with the needed pip packages
  • if you are running Airflow on K8S using the official helm chart, again the recommended approach is to build your own docker image for your Airflow workers where you have the dlt package.

2 Sometimes I feel like the Azure solution for data are a big advertisement for Databricks