Skip to main content

3 posts tagged with "data pipeline observability"

View All Tags

· 7 min read
Adrian Brudaru & Violetta Mishechkina

At dltHub, we have been pioneering the future of data pipeline generation, making complex processes simple and scalable. We have not only been building dlt for humans, but also LLMs.

Pipeline generation on a simple level is already possible directly in ChatGPT chats - just ask for it. But doing it at scale, correctly, and producing comprehensive, good quality pipelines is a much more complex endeavour.

Our early exploration with code generation

As LLMs became available at the end of 2023, we were already uniquely positioned to be part of the wave. By being a library, a LLM could use dlt to build pipelines without the complexities of traditional ETL tools.

This raised from the start the question - what are the different levels of pipeline quality? For example, how does a user code snippet, which formerly had value, compare to LLM snippets which can be generated en-masse? What does a perfect pipeline look like now, and what can LLMs do?

We were only able to answer some of these questions, but we had some concrete outcomes that we carry into the future.

In June ‘23 we added a GPT-4 docs helper that generates snippets

  • try it on our docs; it's widely used as code troubleshooter gpt-4 dhelp

We created an OpenAPI based pipeline generator

marcin-demo

Running into early limits of LLM automation: A manual last mile is needed

Ideally, we would love to point a tool at an API or doc of the API, and just have the pipeline generated.

However, the OpenApi spec does not contain complete information for generating a complete pipeline. There’s many challenges to overcome and gaps that need to be handled manually.

While LLM automation can handle the bulk, some customisation remains manual, generating requirements towards our ongoing efforts of full automation.

Why revisit code generation at dlt now?

Growth drives a need for faster onboarding

The dlt community has been growing steadily in recent months. In February alone we had a 25% growth on Slack and even more in usage.

New users generate a lot of questions and some of them used our onboarding program, where we speed-run users through any obstacles, learning how to make things smoother on the dlt product side along the way.

Onboarding usually means building a pipeline POC fast

During onboarding, most companies want to understand if dlt fits their use cases. For these purposes, building a POC pipeline is pretty typical.

This is where code generation can prove invaluable - and reducing a build time from 2-3d to 0.5 would lower the workload for both users and our team. 💡 To join our onboarding program, fill this form to request a call.

Case Study: How our solution engineer Violetta used our PoC to generate a production-grade Chargebee pipeline within hours

In a recent case, one of our users wanted to try dlt with a source we did not list in our public sources - Chargebee.

Since the Chargebee API uses the OpenAPI standard, we used the OpenAPI PoC dlt pipeline code generator that we built last year.

Starting resources

POC for getting started, human for the last mile.

Onward, let’s look at how our new colleague Violetta, ML Engineer, used this PoC to generate PoCs for our users.

Violetta shares her experience:

So the first thing I found extremely attractive — the code generator actually created a very simple and clean structure to begin with.

I was able to understand what was happening in each part of the code. What unfortunately differs from one API to another — is the authentication method and pagination. This needed some tuning. Also, there were other minor inconveniences which I needed to handle.

There were no great challenges. The most difficult tedious probably was to manually change pagination in different sources and rename each table.

1) Authentication The provided Authentication was a bit off. The generated code assumed the using of a username and password but what was actually required was — an empty username + api_key as a password. So super easy fix was changing

def to_http_params(self) -> CredentialsHttpParams:
cred = f"{self.api_key}:{self.password}" if self.password else f"{self.username}"
encoded = b64encode(f"{cred}".encode()).decode()
return dict(cookies={}, headers={"Authorization": "Basic " + encoded}, params={})

to

def to_http_params(self) -> CredentialsHttpParams:
encoded = b64encode(f"{self.api_key}".encode()).decode()
return dict(cookies={}, headers={"Authorization": "Basic " + encoded}, params={})

Also I was pleasantly surprised that generator had several different authentication methods built in and I could easily replace BasicAuth with BearerAuth of OAuth2 for example.

2) Pagination

For the code generator it’s hard to guess a pagination method by OpenAPI specification, so the generated code has no pagination 😞. So I had to replace a line

def f():
yield _build_response(requests.request(**kwargs))

with yielding form a 6-lines get_page function

def get_pages(kwargs: Dict[str, Any], data_json_path):
has_more = True
while has_more:
response = _build_response(requests.request(**kwargs))
yield extract_nested_data(response.parsed, data_json_path)
kwargs["params"]["offset"] = response.parsed.get("next_offset", None)
has_more = kwargs["params"]["offset"] is not None

The downside — I had to do it for each resource.

3) Too many files

The code wouldn’t run because it wasn’t able to find some models. I found a commented line in generator script

# self._build_models()

I regenerated code with uncommented line and understood why it was commented. Code created 224 .py files under the models directory. Turned out I needed only two of them. Those were models used in api code. So I just removed other 222 garbage files and forgot about them.

4) Namings

The only problem I was left with — namings. The generated table names were like ListEventsResponse200ListItem or ListInvoicesForACustomerResponse200ListItem . I had to go and change them to something more appropriate like events and invoices .

The result

Result: https://github.com/dlt-hub/chargebee-source

I did a walk-through with our user. Some additional context started to appear. For example, which endpoints needed to be used with replace write disposition, which would require specifying the merge keys. So in the end this source would still require some testing to be performed and some fine-tuning from the user. I think the silver lining here is how to start. I don’t know how much time I would’ve spent on this source if I started from scratch. Probably, for the first couple of hours, I would be trying to decide where should the authentication code go, or going through the docs searching for information on how to use dlt configs. I would certainly need to go through all API endpoints in the documentation to be able to find the one I needed. There are a lot of different things which could be difficult especially if you’re doing it for the first time. I think in the end if I had done it from scratch, I would’ve got cleaner code but spent a couple of days. With the generator, even with finetuning, I spent about half a day. And the structure was already there, so it was overall easier to work with and I didn’t have to consider everything upfront.

We are currently working on making full generation a reality.

· 2 min read
Adrian Brudaru

Intro

Here at dltHub, we work on the python library for data ingestion. So when I heard from Airbyte that they are building a library, I was intrigued and decided to investigate.

What is PyAirbyte?

PyAirbyte is an interesting Airbyte’s initiative - similar to the one that Meltano had undertook 3 years ago. It provides a convenient way to download and install Airbyte sources and run them locally storing the data in a cache dataset. Users are allowed to then read the data from this cache.

A Python wrapper on the Airbyte source is quite nice and has a feeling close to Alto. The whole process of cloning/pip installing the repository, spawning a separate process to run Airbyte connector and read the data via UNIX pipe is hidden behind Pythonic interface.

Note that this library is not an Airbyte replacement - the loaders of Airbyte and the library are very different. The library loader uses pandas.to_sql and sql alchemy and is not a replacement for Airbyte destinations that are available in Open Source Airbyte

Questions I had, answered

  • Can I run Airbyte sources with PyAirbyte? A subset of them.
  • Can I use PyAirbyte to run a demo pipeline in a colab notebook? Yes.
  • Would my colab demo have a compatible schema with Airbyte? No.
  • Is PyAirbyte a replacement for Airbyte? No.
  • Can I use PyAirbyte to develop or test during development Airbyte sources? No.
  • Can I develop pipelines with PyAirbyte? no

In conclusion

In wrapping up, it's clear that PyAirbyte is a neat little addition to the toolkit for those of us who enjoy tinkering with data in more casual or exploratory settings. I think this is an interesting initiative from Airbyte that will enable new usage patterns.

Want to discuss?

Join our slack community to take part in the conversation.

· 4 min read
Adrian Brudaru

In large organisations, there are often many data teams that serve different departments. These data teams usually cannot agree where to run their infrastructure, and everyone ends up doing something else. For example:

  • 40 generated GCP projects with various services used on each
  • Native AWS services under no particular orchestrator
  • That on-prem machine that’s the only gateway to some strange corporate data
  • and of course that SaaS orchestrator from the marketing team
  • together with the event tracking lambdas from product
  • don’t forget the notebooks someone scheduled

So, what’s going on? Where is the data flowing? what data is it?

The case at hand

At dltHub, we are data people, and use data in our daily work.

One of our sources is our community slack, which we use in 2 ways:

  1. We are on free tier Slack, where messages expire quickly. We refer to them in our github issues and plan to use the technical answers for training our GPT helper. For these purposes, we archive the conversations daily. We run this pipeline on github actions (docs) which is a serverless runner that does not have a short time limit like cloud functions.
  2. We measure the growth rate of the dlt community - for this, it helps to understand when people join Slack. Because we are on free tier, we cannot request this information from the API, but can capture the event via a webhook. This runs serverless on cloud functions, set up as in this documentation.

So already we have 2 different serverless run environments, each with their own “run reporting”.

Not fun to manage. So how do we achieve a single pane of glass?

Alerts are better than monitoring

Since “checking” things can be tedious, we rather forget about it and be notified. For this, we can use slack to send messages. Docs here.

Here’s a gist of how to use it

from dlt.common.runtime.slack import send_slack_message

def run_pipeline_and_notify(pipeline, data):
try:
load_info = pipeline.run(data)
except Exception as e:
send_slack_message(
pipeline.runtime_config.slack_incoming_hook,
f"Pipeline {pipeline.pipeline_name} failed! \n Error: {str(e)}")
raise

Monitoring load metrics is cheaper than scanning entire data sets

As for monitoring, we could always run some queries to count the amount of loaded rows ad hoc - but this would scan a lot of data and cost significantly on larger volumes.

A better way would be to leverage runtime metrics collected by the pipeline such as row counts. You can find docs on how to do that here.

If we care, governance is doable too

Now, not everything needs to be governed. But for the slack pipelines we want to tag which columns have personally identifiable information, so we can delete that information and stay compliant.

One simple way to stay compliant is to annotate your raw data schema and use views for the transformed data, so if you delete the data at source, it’s gone everywhere.

If you are materialising your transformed tables, you would need to have column level lineage in the transform layer to facilitate the documentation and deletion of the data. Here’s a write up of how to capture that info. There are also other ways to grab a schema and annotate it, read more here.

In conclusion

There are many reasons why you’d end up running pipelines in different places, from organisational disagreements, to skillset differences, or simply technical restrictions.

Having a single pane of glass is not just beneficial but essential for operational coherence.

While solutions exist for different parts of this problem, the data collection still needs to be standardised and supported across different locations.

By using a tool like dlt, standardisation is introduced with ingestion, enabling cross-orchestrator observability and monitoring.

Want to discuss?

Join our slack community to take part in the conversation.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.