Skip to main content

10 posts tagged with "etl"

View All Tags

· 9 min read
Adrian Brudaru

What is the REST API Source toolkit?

tip

tl;dr: You are probably familiar with REST APIs.

  • Our new REST API Source is a short, declarative configuration driven way of creating sources.
  • Our new REST API Client is a collection of Python helpers used by the above source, which you can also use as a standalone, config-free, imperative high-level abstraction for building pipelines.

Want to skip to docs? Links at the bottom of the post.

Why REST configuration pipeline? Obviously, we need one!

But of course! Why repeat write all this code for requests and loading, when we could write it once and re-use it with different APIs with different configs?

Once you have built a few pipelines from REST APIs, you can recognise we could, instead of writing code, write configuration.

We can call such an obvious next step in ETL tools a “focal point” of “convergent evolution”.

And if you’ve been in a few larger more mature companies, you will have seen a variety of home-grown solutions that look similar. You might also have seen such solutions as commercial products or offerings.

But ours will be better…

So far we have seen many REST API configurators and products — they suffer from predictable flaws:

  • Local homebrewed flavors are local for a reason: They aren’t suitable for the broad audience. And often if you ask the users/beneficiaries of these frameworks, they will sometimes argue that they aren’t suitable for anyone at all.
  • Commercial products are yet another data product that doesn’t plug into your stack, brings black boxes and removes autonomy, so they simply aren’t an acceptable solution in many cases.

So how can dlt do better?

Because it can keep the best of both worlds: the autonomy of a library, the quality of a commercial product.

As you will see further, we created not just a standalone “configuration-based source builder” but we also expose the REST API client used enabling its use directly in code.

Hey community, you made us do it!

The push for this is coming from you, the community. While we had considered the concept before, there were many things dlt needed before creating a new way to build pipelines. A declarative extractor after all, would not make dlt easier to adopt, because a declarative approach requires more upfront knowledge.

Credits:

  • So, thank you Alex Butler for building a first version of this and donating it to us back in August ‘23: https://github.com/dlt-hub/dlt-init-openapi/pull/2.
  • And thank you Francesco Mucio and Willi Müller for re-opening the topic, and creating video tutorials.
  • And last but not least, thank you to dlt team’s Anton Burnashev (also known for gspread library) for building it out!

The outcome? Two Python-only interfaces, one declarative, one imperative.

  • dlt’s REST API Source is a Python dictionary-first declarative source builder, that has enhanced flexibility, supports callable passes, native config validations via python dictionaries, and composability directly in your scripts. It enables generating sources dynamically during runtime, enabling straightforward, manual or automated workflows for adapting sources to changes.
  • dlt’s REST API Client is the low-level abstraction that powers the REST API Source. You can use it in your imperative code for more automation and brevity, if you do not wish to use the higher level declarative interface.

Useful for those who frequently build new pipelines

If you are on a team with 2-3 pipelines that never change much you likely won’t see much benefit from our latest tool. What we observe from early feedback a declarative extractor is great at is enabling easier work at scale. We heard excitement about the REST API Source from:

  • companies with many pipelines that frequently create new pipelines,
  • data platform teams,
  • freelancers and agencies,
  • folks who want to generate pipelines with LLMs and need a simple interface.

How to use the REST API Source?

Since this is a declarative interface, we can’t make things up as we go along, and instead need to understand what we want to do upfront and declare that.

In some cases, we might not have the information upfront, so we will show you how to get that info during your development workflow.

Depending on how you learn better, you can either watch the videos that our community members made, or follow the walkthrough below.

Video walkthroughs

In these videos, you will learn at a leisurely pace how to use the new interface. Playlist link.

Workflow walkthrough: Step by step

If you prefer to do things at your own pace, try the workflow walkthrough, which will show you the workflow of using the declarative interface.

In the example below, we will show how to create an API integration with 2 endpoints. One of these is a child resource, using the data from the parent endpoint to make a new request.

Configuration Checklist: Before getting started

In the following, we will use the GitHub API as an example.

We will also provide links to examples from this Google Colab tutorial.

  1. Collect your api url and endpoints, Colab example:

    • An URL is the base of the request, for example: https://api.github.com/.
    • An endpoint is the path of an individual resource such as:
      • /repos/{OWNER}/{REPO}/issues;
      • or /repos/{OWNER}/{REPO}/issues/{issue_number}/comments which would require the issue number from the above endpoint;
      • or /users/{username}/starred etc.
  2. Identify the authentication methods, Colab example:

  3. Identify if you have any dependent request patterns such as first get ids in a list, then use id for requesting details. For GitHub, we might do the below or any other dependent requests. Colab example.:

    1. Get all repos of an org https://api.github.com/orgs/{org}/repos.
    2. Then get all contributors https://api.github.com/repos/{owner}/{repo}/contributors.
  4. How does pagination work? Is there any? Do we know the exact pattern? Colab example.

    • On GitHub, we have consistent pagination between endpoints that looks like this link_header = response.headers.get('Link', None).
  5. Identify the necessary information for incremental loading, Colab example:

    • Will any endpoints be loaded incrementally?
    • What columns will you use for incremental extraction and loading?
    • GitHub example: We can extract new issues by requesting issues after a particular time: https://api.github.com/repos/{repo_owner}/{repo_name}/issues?since={since}.

Configuration Checklist: Checking responses during development

  1. Data path:
  2. Unless you had full documentation at point 4 (which we did), you likely need to still figure out some details on how pagination works.
    1. To do that, we suggest using curl or a second python script to do a request and inspect the response. This gives you flexibility to try anything. Colab example.
    2. Or you could print the source as above - but if there is metadata in headers etc, you might miss it.

Applying the configuration

Here’s what a configured example could look like:

  1. Base URL and endpoints.
  2. Authentication.
  3. Pagination.
  4. Incremental configuration.
  5. Dependent resource (child) configuration.

If you are using a narrow screen, scroll the snippet below to look for the numbers designating each component (n).

# This source has 2 resources:
# - issues: Parent resource, retrieves issues incl. issue number
# - issues_comments: Child resource which needs the issue number from parent.

import os
from rest_api import RESTAPIConfig

github_config: RESTAPIConfig = {
"client": {
"base_url": "https://api.github.com/repos/dlt-hub/dlt/", #(1)
# Optional auth for improving rate limits
# "auth": { #(2)
# "token": os.environ.get('GITHUB_TOKEN'),
# },
},
# The paginator is autodetected, but we can pass it explicitly #(3)
# "paginator": {
# "type": "header_link",
# "next_url_path": "paging.link",
# }
# We can declare generic settings in one place
# Our data is stateful so we load it incrementally by merging on id
"resource_defaults": {
"primary_key": "id", #(4)
"write_disposition": "merge", #(4)
# these are request params specific to GitHub
"endpoint": {
"params": {
"per_page": 10,
},
},
},
"resources": [
# This is the first resource - issues
{
"name": "issues",
"endpoint": {
"path": "issues", #(1)
"params": {
"sort": "updated",
"direction": "desc",
"state": "open",
"since": {
"type": "incremental", #(4)
"cursor_path": "updated_at", #(4)
"initial_value": "2024-01-25T11:21:28Z", #(4)
},
}
},
},
# Configuration for fetching comments on issues #(5)
# This is a child resource - as in, it needs something from another
{
"name": "issue_comments",
"endpoint": {
"path": "issues/{issue_number}/comments", #(1)
# For child resources, you can use values from the parent resource for params.
"params": {
"issue_number": {
# Use type "resolve" to define child endpoint wich should be resolved
"type": "resolve",
# Parent endpoint
"resource": "issues",
# The specific field in the issues resource to use for resolution
"field": "number",
}
},
},
# A list of fields, from the parent resource, which will be included in the child resource output.
"include_from_parent": ["id"],
},
],
}

And that’s a wrap — what else should you know?

  • As we mentioned, there’s also a REST Client - an imperative way to use the same abstractions, for example, the auto-paginator - check out this runnable snippet:

    from dlt.sources.helpers.rest_client import RESTClient

    # Initialize the RESTClient with the Pokémon API base URL
    client = RESTClient(base_url="https://pokeapi.co/api/v2")

    # Using the paginate method to automatically handle pagination
    for page in client.paginate("/pokemon"):
    print(page)
  • We are going to generate a bunch of sources from OpenAPI specs — stay tuned for an update in a couple of weeks!

Next steps

· 3 min read
Adrian Brudaru

About Yummy.eu

Yummy is a Lean-ops meal-kit company streamlines the entire food preparation process for customers in emerging markets by providing personalized recipes, nutritional guidance, and even shopping services. Their innovative approach ensures a hassle-free, nutritionally optimized meal experience, making daily cooking convenient and enjoyable.

Yummy is a food box business. At the intersection of gastronomy and logistics, this market is very competitive. To make it in this market, Yummy needs to be fast and informed in their operations.

Pipelines are not yet a commodity.

At Yummy, efficiency and timeliness are paramount. Initially, Martin, Yummy’s CTO, chose to purchase data pipelining tools for their operational and analytical needs, aiming to maximize time efficiency. However, the real-world performance of these purchased solutions did not meet expectations, which led to a reassessment of their approach.

What’s important: Velocity, Reliability, Speed, time. Money is secondary.

Martin was initially satisfied with the ease of setup provided by the SaaS services.

The tipping point came when an update to Yummy’s database introduced a new log table, leading to unexpectedly high fees due to the vendor’s default settings that automatically replicated new tables fully on every refresh. This situation highlighted the need for greater control over data management processes and prompted a shift towards more transparent and cost-effective solutions.

10x faster, 182x cheaper with dlt + async + modal

Motivated to find a solution that balanced cost with performance, Martin explored using dlt, a tool known for its simplicity in building data pipelines. By combining dlt with asynchronous operations and using Modal for managed execution, the improvements were substantial:

  • Data processing speed increased tenfold.
  • Cost reduced by 182 times compared to the traditional SaaS tool.
  • The new system supports extracting data once and writing to multiple destinations without additional costs.

For a peek into on how Martin implemented this solution, please see Martin's async Postgres source on GitHub..

salo-martin-tweet

Taking back control with open source has never been easier

Taking control of your data stack is more accessible than ever with the broad array of open-source tools available. SQL copy pipelines, often seen as a basic utility in data management, do not generally differ significantly between platforms. They perform similar transformations and schema management, making them a commodity available at minimal cost.

SQL to SQL copy pipelines are widespread, yet many service providers charge exorbitant fees for these simple tasks. In contrast, these pipelines can often be set up and run at a fraction of the cost—sometimes just the price of a few coffees.

At dltHub, we advocate for leveraging straightforward, freely available resources to regain control over your data processes and budget effectively.

Setting up a SQL pipeline can take just a few minutes with the right tools. Explore these resources to enhance your data operations:

For additional support or to connect with fellow data professionals, join our community.

· 4 min read
Adrian Brudaru

Statistical Data and Metadata eXchange (SDMX) is an international standard used extensively by global organizations, government agencies, and financial institutions to facilitate the efficient exchange, sharing, and processing of statistical data.

Utilizing SDMX enables seamless integration and access to a broad spectrum of statistical datasets covering economics, finance, population demographics, health, and education, among others.

These capabilities make it invaluable for creating robust, data-driven solutions that rely on accurate and comprehensive data sources.

embeddable etl

Why SDMX?

SDMX not only standardizes data formats across disparate systems but also simplifies the access to data provided by institutions such as Eurostat, the ECB (European Central Bank), the IMF (International Monetary Fund), and many national statistics offices.

This standardization allows data engineers and scientists to focus more on analyzing data rather than spending time on data cleaning and preparation.

Installation and Basic Usage

To start integrating SDMX data sources into your Python applications, install the sdmx library using pip:

pip install sdmx1

Here's an example of how to fetch data from multiple SDMX sources, illustrating the diversity of data flows and the ease of access:

from sdmx_source import sdmx_source

source = sdmx_source([
{"data_source": "ESTAT", "dataflow": "PRC_PPP_IND", "key": {"freq": "A", "na_item": "PLI_EU28", "ppp_cat": "A0101", "geo": ["EE", "FI"]}, "table_name": "food_price_index"},
{"data_source": "ESTAT", "dataflow": "sts_inpr_m", "key": "M.PROD.B-D+C+D.CA.I15+I10.EE"},
{"data_source": "ECB", "dataflow": "EXR", "key": {"FREQ": "A", "CURRENCY": "USD"}}
])
print(list(source))

This configuration retrieves data from:

  • Eurostat (ESTAT) for the Purchasing Power Parity (PPP) and Price Level Indices providing insights into economic factors across different regions.
  • Eurostat's short-term statistics (sts_inpr_m) on industrial production, which is crucial for economic analysis.
  • European Central Bank (ECB) for exchange rates, essential for financial and trade-related analyses.

Loading the data with dlt, leveraging best practices

After retrieving data using the sdmx library, the next challenge is effectively integrating this data into databases. The dlt library excels in this area by offering a robust solution for data loading that adheres to best practices in several key ways:

  • Automated schema management -> dlt infers types and evolves schema as needed. It automatically handles nested structures too. You can customise this behavior, or turn the schema into a data contract.
  • Declarative configuration -> You can easily switch between write dispositions (append/replace/merge) or destinations.
  • Scalability -> dlt is designed to handle large volumes of data efficiently, making it suitable for enterprise-level applications and high-volume data streams. This scalability ensures that as your data needs grow, your data processing pipeline can grow with them without requiring significant redesign or resource allocation.

Martin Salo, CTO at Yummy, a food logistics company, uses dlt to efficiently manage complex data flows from SDMX sources. By leveraging dlt, Martin ensures that his data pipelines are not only easy to build, robust and error-resistant but also optimized for performance and scalability.

View Martin Salo's implementation

Martin Salo's implementation of the sdmx_source package effectively simplifies the retrieval of statistical data from diverse SDMX data sources using the Python sdmx library. The design is user-friendly, allowing both simple and complex data queries, and integrates the results directly into pandas DataFrames for immediate analysis.

This implementation enhances data accessibility and prepares it for analytical applications, with built-in logging and error handling to improve reliability.

Conclusion

Integrating sdmx and dlt into your data pipelines significantly enhances data management practices, ensuring operations are robust, scalable, and efficient. These tools provide essential capabilities for data professionals looking to seamlessly integrate complex statistical data into their workflows, enabling more effective data-driven decision-making.

By engaging with the data engineering community and sharing strategies and insights on effective data integration, data engineers can continue to refine their practices and achieve better outcomes in their projects.

Join the conversation and share your insights in our Slack community.

· 8 min read
Adrian Brudaru

embeddable etl

The versatility that enables "one way to rule them all"... requires a devtool

A unified approach to ETL processes centers around standardization without compromising flexibility. To achieve this, we need to be enabled to build and run custom code, bu also have helpers to enable us to standardise and simplify our work.

In the data space, we have a few custom code options, some of which portable. But what is needed to achieve universality and portability is more than just a code standard.

So what do we expect from such a tool?

  • It should be created for our developers
  • it should be easily pluggable into existing tools and workflows
  • it should perform across a variety of hardware and environments.

Data teams don't speak Object Oriented Programming (OOP)

Connectors are nice, but when don't exist or break, what do we do? We need to be able to build and maintain those connectors simply, as we work with the rest of our scripts.

The data person has a very mixed spectrum of activities and responsibilities, and programming is often a minor one. Thus, across a data team, while some members can read or even speak OOP, the team will not be able to do so without sacrificing other capabilities.

This means that in order to be able to cater to a data team as a dev team, we need to aknowledge a different abstraction is needed.

Goodbye OOP, hello @decorators!

Data teams often navigate complex systems and workflows that prioritize functional clarity over object-oriented programming (OOP) principles. They require tools that simplify process definition, enabling quick, readable, and maintainable data transformation and movement. Decorators serve this purpose well, providing a straightforward way to extend functionality without the overhead of class hierarchies and inheritance.

Decorators in Python allow data teams to annotate functions with metadata and operational characteristics, effectively wrapping additional behavior around core logic. This approach aligns with the procedural mindset commonly found in data workflows, where the emphasis is on the transformation steps and data flow rather than the objects that encapsulate them.

By leveraging decorators, data engineers can focus on defining what each part of the ETL process does—extract, transform, load—without delving into the complexities of OOP. This simplification makes the code more accessible to professionals who may not be OOP experts but are deeply involved in the practicalities of data handling and analysis.

The ability to run embedded is more than just scalability

Most traditional ETL frameworks are architected with the assumption of relatively abundant computational resources. This makes sense given the resource-intensive nature of ETL tasks when dealing with massive datasets.

However, this assumption often overlooks the potential for running these processes on smaller, more constrained infrastructures, such as directly embedded within an orchestrator or on edge devices.

The perspective that ETL processes necessarily require large-scale infrastructure is ripe for challenge. In fact, there is a compelling argument to be made for the efficiency and simplicity of executing ETL tasks, particularly web requests for data integration, on smaller systems. This approach can offer significant cost savings and agility, especially when dealing with less intensive data loads or when seeking to maintain a smaller digital footprint.

Small infrastructure ETL runs can be particularly efficient in situations where real-time data processing is not required, or where data volumes are modest. By utilizing the orchestrator's inherent scheduling and management capabilities, one can execute ETL jobs in a leaner, more cost-effective manner. This can be an excellent fit for organizations that have variable data processing needs, where the infrastructure can scale down to match lower demands, thereby avoiding the costs associated with maintaining larger, underutilized systems.

Running on small workers is easier than spinning up infra

Running ETL processes directly on an orchestrator can simplify architecture by reducing the number of moving parts and dependencies. It allows data teams to quickly integrate new data sources and destinations with minimal overhead. This methodology promotes a more agile and responsive data architecture, enabling businesses to adapt more swiftly to changing data requirements.

It's important to recognize that this lean approach won't be suitable for all scenarios, particularly where data volumes are large or where the complexity of transformations requires the robust computational capabilities of larger systems. Nevertheless, for a significant subset of ETL tasks, particularly those involving straightforward data integrations via web requests, running on smaller infrastructures presents an appealing alternative that is both cost-effective and simplifies the overall data processing landscape.

Dealing with spiky loads is easier on highly parallel infras like serverless functions

Serverless functions are particularly adept at managing spiky data loads due to their highly parallel and elastic nature. These platforms automatically scale up to handle bursts of data requests and scale down immediately after processing, ensuring that resources are utilized only when necessary. This dynamic scaling not only improves resource efficiency but also reduces costs, as billing is based on actual usage rather than reserved capacity.

The stateless design of serverless functions allows them to process multiple, independent tasks concurrently. This capability is crucial for handling simultaneous data streams during peak times, facilitating rapid data processing that aligns with sudden increases in load. Each function operates in isolation, mitigating the risk of one process impacting another, which enhances overall system reliability and performance.

Moreover, serverless architectures eliminate the need for ongoing server management and capacity planning. Data engineers can focus solely on the development of ETL logic without concerning themselves with underlying infrastructure issues. This shift away from operational overhead to pure development accelerates deployment cycles and fosters innovation.

Some examples of embedded portability with dlt

Dagster's embedded ETL now supports dlt - enabling devs to do what they love - build.

The "Stop Reinventing Orchestration: Embedded ELT in the Orchestrator" blog post by Pedram from Dagster Labs, introduces the concept of Embedded ELT within an orchestration framework, highlighting the transition in data engineering from bulky, complex systems towards more streamlined, embedded solutions that simplify data ingestion and management. This evolution is seen in the move away from heavy tools like Airbyte or Meltano towards utilizing lightweight, performant libraries which integrate seamlessly into existing orchestration platforms, reducing deployment complexity and operational overhead. This approach leverages the inherent capabilities of orchestration systems to handle concerns typical to data ingestion, such as state management, error handling, and observability, thereby enhancing efficiency and developer experience.

dlt was built for just such a scenario and we are happy to be adopted into it. Besides adding connectors, dlt adds a simple way to build custom pipelines.

Read more about it on Dagster blog post on dlt.

Dagworks' dlt + duckdb + ibis + Hamilton demo

The DAGWorks Substack post introduces a highly portable pipeline of all libraries, and leverages a blend of open-source Python libraries: dlt, Ibis, and Hamilton. This integration exemplifies the trend towards modular, decentralized data systems, where each component specializes in a segment of the data handling process—dlt for extraction and loading, Ibis for transformation, and Hamilton for orchestrating complex data flows. These technologies are not just tools but represent a paradigm shift in data engineering, promoting agility, scalability, and cost-efficiency in deploying serverless microservices.

The post not only highlights the technical prowess of combining these libraries to solve practical problems like message retention and thread summarization on Slack but also delves into the meta aspects of such integrations. It reflects on the broader implications of adopting a lightweight stack that can operate within diverse infrastructures, from cloud environments to embedded systems, underscoring the shift towards interoperability and backend agnosticism in data engineering practices. This approach illustrates a shift in the data landscape, moving from monolithic systems to flexible, adaptive solutions that can meet specific organizational needs without heavy dependencies or extensive infrastructure.

Read more about it on Dagworks blog post on dlt.

Closing thoughts

The concepts discussed here—portability, simplicity, and scalability—are central to modern data engineering practices. They reflect a shift towards tools that not only perform well but also integrate seamlessly across different environments, from high-powered servers to minimal infrastructures like edge devices. This shift emphasizes the importance of adaptability in tools used by data teams, catering to a broad spectrum of deployment scenarios without sacrificing performance.

In this landscape, dlt exemplifies the type of tool that embodies these principles. It's not just about being another platform; it's about providing a framework that supports the diverse needs of developers and engineers. dlt's design allows it to be embedded directly within various architectures, enabling teams to implement robust data processes with minimal overhead. This approach reduces complexity and fosters an environment where innovation is not hindered by the constraints of traditional data platforms.

We invite the community to engage with these concepts through dlt, contributing to its evolution and refinement. By participating in this collaborative effort, you can help ensure that the tool remains at the forefront of data engineering technology, providing effective solutions that address the real-world challenges of data management and integration.

Join the conversation and share your insights in our Slack community or contribute directly to the growing list of projects using us. Your expertise can drive the continuous improvement of dlt, shaping it into a tool that not only meets current demands but also anticipates future needs in the data engineering field.

· 8 min read
Adrian Brudaru

The concept of simplicity and automation in a programming language is not new. Perl scripting language had the motto "Perl makes easy things easy and hard things possible".

The reason for this motto, was the difficulty of working with C, which requires more manual handling of resources and also a compilation step.

Perl scripts could be written and executed rapidly, making it ideal for tasks that needed quick development cycles. This ease of use and ability to handle complex tasks without cumbersome syntax made Perl incredibly popular in its heyday.

Perl was introduced as a scripting language that emphasized getting things done. It was created as a practical extraction and reporting tool, which quickly found its place in system administration, web development, and network programming.

History repeats, Python is a language for humans

human-building

Python took the philosophy of making programming more accessible and human-friendly even further. Guido van Rossum created Python with the goal of removing the drudgery from coding, choosing to prioritize readability and simplicity. This design philosophy makes Python an intuitive language not just for seasoned developers but for beginners as well. Its syntax is clean and expressive, allowing developers to write fewer lines of code for tasks that would require more in Perl or other languages. Python's extensive standard library, along with its powerful data structures, contribute to its ability to handle complex applications with ease.

Python's widespread adoption across various domains, from web development to data science and machine learning, is largely attributed to its accessibility.

Its simple syntax resembles natural language, which lowers the barrier to entry for programming. Compared to Perl, Python offers an even more organized and readable approach to coding, making it an ideal teaching language that prepares new developers for future challenges in software development.

And just like perl, it's used for data extraction and visualisation - but now it's done by normie humans, not sysadmins or devs.

dlt makes easy things fast, and hard things accessible

Following the principles of Perl and Python, dlt aimed to simplify the data engineering process. dlt focuses on making the extraction and loading of data as straightforward as possible.

dlt makes easy things fast

Starting from a simple abstraction like pipeline.run(data, table_name="table"), where data can be any iterable such as a generator or dataframe, dlt enables robust loading. Here is what the above function does, so you don't have to.

  • It will (optionally) unpack nested lists into separate tables with generated join keys, and flatten nested dictionaries into a main row.
  • If given a generator, it will consume it via microbatching, buffering to disk or external drives, never running out of memory (customisable).
  • it will create "extract packages" of extracted data so if the downstream steps fail, it can resume/retry later.
  • It will normalise the data into a shape that naturally fits the database (customisable).
  • It will create "load packages" of normalised data so if the downstream steps fail, it can retry later.
  • It infers and loads with the correct data types, for example from ISO timestamp strings (configurable).
  • It can accept different types of write dispositions declaratively such as 'append', 'merge' and 'replace'.
  • It will evolve the schema if we load a second time something with new columns, and it can alert the schema changes.
  • It will even create type variant columns if data types change (and alert if desired).
  • Or you can stop the schema from evolving and use the inferred schema or a modified one as a data contract
  • It will report load packages associated with new columns, enabling passing down column level lineage

That's a lot of development and maintenance pain solved only at its simplest. You could say, the dlt loader doesn't break, as long as it encounters common data types. If an obscure type is in your data, it would need to be added to dlt or converted beforehand.

From robust loading to robust extraction

Building on the simple loading abstraction, dlt is more than a tool for simple things.

The next step in dlt usage is to leverage it for extraction. dlt offers the concepts of 'source' and 'resource', A resource is the equivalent of a single data source, while a source is the group we put resources in to bundle them for usage.

For example, an API extractor from a single API with multiple endpoints, would be built as a source with multiple resources.

Resources enable you to easily configure how the data in that resource is loaded. You can create a resource by decorating a method with the '@resource' decorator, or you can generate them dynamically.

Examples of dynamic resources

  • If we have an api with multiple endpoints, we can put the endpoints in a list and iterate over it to generate resources
  • If we have an endpoint that gives us datapoints with different schemas, we could split them by a column in the data.
  • Similarly, if we have a webhook that listens to multiple types of events, it can dispatch each event type to its own table based on the data.
  • Or, if we want to shard a data stream into day-shards, we could append a date suffix in the resource name dynamically.

Once we group resources into a source, we can run them together (or, we could still run the resources independently)

Examples of reasons to group resources into sources.

  • We want to run (load) them together on the same schedule
  • We want to configure them together or keep their schemas together
  • They represent a single API and we want to publish them in a coherent, easy to use way.

So what are the efforts you spare when using dlt here?

  • A source can function similar to a class, but simpler, encouraging code reuse and simplicity.
  • Resources offer more granular configuration options
  • Resources can also be transformers, passing data between them in a microbatched way enabling patters like enrichments or list/detail endpoints.
  • Source schemas can be configured with various options such as pushing down top level columns into nested structures
  • dlt's requests replacement has built in retries for non-permanent error codes. This safeguards the progress of long extraction jobs that could otherwise break over and over (if retried as a whole) due to network or source api issues.

What else does dlt bring to the table?

Beyond the ease of data extraction and loading, dlt introduces several advanced features that further simplify data engineering tasks:

Asynchronous operations: dlt harnesses the power of asynchronous programming to manage I/O-bound and network operations efficiently. This means faster data processing, better resource utilization, and more responsive applications, especially when dealing with high volumes of data or remote data sources.

Flexible destinations and reverse ETL: dlt isn't just about pulling data in; it's about sending it where it needs to go. Whether it's a SQL database, a data lake, or a cloud-based storage solution or a custom reverse etl destination, dlt provides the flexibility to integrate with various destinations.

Optional T in ETL: With dlt, transformations are not an afterthought but a core feature. You can define transformations as part of your data pipelines, ensuring that the data is not just moved but refined, enriched, and shaped to fit your analytical needs. This capability allows for more sophisticated data modeling and preparation tasks to be streamlined within your ELT processes.

Data quality and observability: dlt places a strong emphasis on data quality and observability. It includes features for schema evolution tracking, data type validation, and error handling, and data contracts, which are critical for maintaining the integrity of your data ecosystem. Observability tools integrated within dlt help monitor the health and performance of your pipelines, providing insights into data flows, bottlenecks, and potential issues before they escalate.

Community and ecosystem: One of the most significant advantages of dlt is its growing community and ecosystem. Similar to Python, dlt benefits from contributions that extend its capabilities, including connectors, plugins, and integrations. This collaborative environment ensures that dlt remains at the forefront of data engineering innovation, adapting to new challenges and opportunities.

In essence, dlt is not just a tool but a comprehensive one stop shop that addresses the end-to-end needs of modern data ingestion. By combining the simplicity of Python with the robustness of enterprise-grade tools, dlt democratizes data engineering, making it accessible to a broader audience. Whether you're a data scientist, analyst, or engineer, dlt empowers you to focus on what matters most: deriving insights and value from your data.

Conclusion

As Perl and Python have made programming more accessible, dlt is set to transform data engineering by making sophisticated data operations accessible to all. This marks a significant shift towards the democratization of technology, enabling more individuals to contribute to and benefit from the digital landscape. dlt isn't just about making easy things fast and hard things accessible; it's about preparing a future where data engineering becomes an integral part of every data professional's toolkit.

· 9 min read
Aman Gupta
info

TL;DR: This article compares deploying dbt-core standalone and using dlt-dbt runner on Google Cloud Functions. The comparison covers various aspects, along with a step-by-step deployment guide.

dbt or “data build tool” has become a standard for transforming data in analytical environments. Most data pipelines nowadays start with ingestion and finish with running a dbt package.

dlt or “data load tool” is an open-source Python library for easily creating data ingestion pipelines. And of course, after ingesting the data, we want to transform it into an analytical model. For this reason, dlt offers a dbt runner that’s able to just run a dbt model on top of where dlt loaded the data, without setting up any additional things like dbt credentials.

Using dbt in Google Cloud functions

To use dbt in cloud functions, we employed two methods:

  1. dbt-core on GCP cloud functions.
  2. dlt-dbt runner on GCP cloud functions.

Let’s discuss these methods one by one.

1. Deploying dbt-core on Google Cloud functions

Let's dive into running dbt-core up on cloud functions.

You should use this option for scenarios where you have already collected and housed your data in a data warehouse, and you need further transformations or modeling of the data. This is a good option if you have used dbt before and want to leverage the power of dbt-core. If you are new to dbt, please refer to dbt documentation: Link Here.

Let’s start with setting up the following directory structure:

dbt_setup
|-- main.py
|-- requirements.txt
|-- profiles.yml
|-- dbt_project.yml
|-- dbt_transform
|-- models
| |-- model1.sql
| |-- model2.sql
| |-- sources.yml
|-- (other dbt related contents, if required)

You can setup the contents in dbt_transform folder by initing a new dbt project, for details refer to documentation.

note

We recommend setting up and testing dbt-core locally before using it in cloud functions.

To run dbt-core on GCP cloud functions:

  1. Once you've tested the dbt-core package locally, update the profiles.yml before migrating the folder to the cloud function as follows:

    dbt_gcp: # project name
    target: dev # environment
    outputs:
    dev:
    type: bigquery
    method: oauth
    project: please_set_me_up! # your GCP project name
    dataset: please_set_me_up! # your project dataset name
    threads: 4
    impersonate_service_account: please_set_me_up! # GCP service account

    This service account should have bigquery read and write permissions.

  2. Next, modify the main.py as follows:

    import os
    import subprocess
    import logging

    # Configure logging
    logging.basicConfig(level=logging.INFO)

    def run_dbt(request):
    try:
    # Set your dbt profiles directory (assuming it's in /workspace)
    os.environ['DBT_PROFILES_DIR'] = '/workspace/dbt_transform'

    # Log the current working directory and list files
    dbt_project_dir = '/workspace/dbt_transform'
    os.chdir(dbt_project_dir)

    # Log the current working directory and list files
    logging.info(f"Current working directory: {os.getcwd()}")
    logging.info(f"Files in the current directory: {os.listdir('.')}")

    # Run dbt command (e.g., dbt run)

    result = subprocess.run(
    ['dbt', 'run'],
    capture_output=True,
    text=True
    )

    # Return dbt output
    return result.stdout

    except Exception as e:
    logging.error(f"Error running dbt: {str(e)}")
    return f"Error running dbt: {str(e)}"
  3. Next, list runtime-installable modules in requirements.txt:

    dbt-core
    dbt-bigquery
  4. Finally, you can deploy the function using gcloud CLI as:

    gcloud functions deploy YOUR_FUNCTION_NAME \
    --gen2 \
    --region=YOUR_REGION \
    --runtime=python310 \
    --source=YOUR_SOURCE_LOCATION \
    --entry-point=YOUR_CODE_ENTRYPOINT \
    TRIGGER_FLAGS

    You have option to deploy the function via GCP Cloud Functions' GUI.

2. Deploying function using dlt-dbt runner

The second option is running dbt using data load tool(dlt).

I work at dlthub and often create dlt pipelines. These often need dbt for modeling the data, making the dlt-dbt combination highly effective. For using this combination on cloud functions, we used dlt-dbt runner developed at dlthub.

The main reason I use this runner is because I load data with dlt and can re-use dlt’s connection to the warehouse to run my dbt package, saving me the time and code complexity I’d need to set up and run dbt standalone.

To integrate dlt and dbt in cloud functions, use the dlt-dbt runner; here’s how:

  1. Lets start by creating the following directory structure:

    dbt_setup
    |-- main.py
    |-- requirements.txt
    |-- dbt_project.yml
    |-- dbt_transform
    |-- models
    | |-- model1.sql
    | |-- model2.sql
    | |-- sources.yml
    |-- (other dbt related contents, if required)

    You can set up the dbt by initing a new project, for details refer to documentation.

    note

    With the dlt-dbt runner configuration, setting up a profiles.yml is unnecessary. DLT seamlessly shares credentials with dbt, and on Google Cloud Functions, it automatically retrieves service account credentials, if none are provided.

  2. Next, configure the dbt_projects.yml and set the model directory, for example:

    model-paths: ["dbt_transform/models"]
  3. Next, configure the main.py as follows:

    import dlt
    import logging
    from flask import jsonify
    from dlt.common.runtime.slack import send_slack_message
    from dlt.common import json

    def run_pipeline(request):
    """
    Set up and execute a data processing pipeline, returning its status
    and model information.

    This function initializes a dlt pipeline with pre-defined settings,
    runs the pipeline with a sample dataset, and then applies dbt
    transformations. It compiles and returns the information about
    each dbt model's execution.

    Args:
    request: The Flask request object. Not used in this function.

    Returns:
    Flask Response: A JSON response with the pipeline's status
    and dbt model information.
    """
    try:
    # Sample data to be processed
    data = [{"name": "Alice Smith", "id": 1, "country": "Germany"},
    {"name": "Carlos Ruiz", "id": 2, "country": "Romania"},
    {"name": "Sunita Gupta", "id": 3, "country": "India"}]

    # Initialize a dlt pipeline with specified settings
    pipeline = dlt.pipeline(
    pipeline_name="user_data_pipeline",
    destination="bigquery",
    dataset_name="dlt_dbt_test"
    )

    # Run the pipeline with the sample data
    pipeline.run(data, table_name="sample_data")

    # Apply dbt transformations and collect model information
    models = transform_data(pipeline)
    model_info = [
    {
    "model_name": m.model_name,
    "time": m.time,
    "status": m.status,
    "message": m.message
    }
    for m in models
    ]

    # Convert the model information to a string
    model_info_str = json.dumps(model_info)

    # Send the model information to Slack
    send_slack_message(
    pipeline.runtime_config.slack_incoming_hook,
    model_info_str
    )

    # Return a success response with model information
    return jsonify({"status": "success", "model_info": model_info})
    except Exception as e:
    # Log and return an error response in case of any exceptions
    logging.error(f"Error in running pipeline: {e}", exc_info=True)

    return jsonify({"status": "error", "error": str(e)}), 500

    def transform_data(pipeline):
    """
    Execute dbt models for data transformation within a dlt pipeline.

    This function packages and runs all dbt models associated with the
    pipeline, applying defined transformations to the data.

    Args:
    pipeline (dlt.Pipeline): The pipeline object for which dbt
    transformations are run.

    Returns:
    list: A list of dbt model run information, indicating the
    outcome of each model.

    Raises:
    Exception: If there is an error in running the dbt models.
    """
    try:
    # Initialize dbt with the given pipeline and virtual environment
    dbt = dlt.dbt.package(
    pipeline,
    "/workspace/dbt_transform",
    venv=dlt.dbt.get_venv(pipeline)
    )
    logging.info("Running dbt models...")
    # Run all dbt models and return their run information
    return dbt.run_all()
    except Exception as e:
    # Log and re-raise any errors encountered during dbt model
    # execution
    logging.error(f"Error in running dbt models: {e}", exc_info=True)
    raise

    # Main execution block
    if __name__ == "__main__":
    # Execute the pipeline function.
    run_pipeline(None)
  4. The send_slack_message function is utilized for sending messages to Slack, triggered by both success and error events. For setup instructions, please refer to the official documentation here.

    RUNTIME__SLACK_INCOMING_HOOK was set up as environment variable in the above code.

  5. Next, list runtime-installable modules in requirements.txt:

    dbt-core
    dbt-bigquery
  6. Finally, you can deploy the function using gcloud CLI as:

    gcloud functions deploy YOUR_FUNCTION_NAME \
    --gen2 \
    --region=YOUR_REGION \
    --runtime=python310 \
    --source=YOUR_SOURCE_LOCATION \
    --entry-point=YOUR_CODE_ENTRYPOINT \
    TRIGGER_FLAGS

The merit of this method is that it can be used to load and transform data simultaneously. Using dlt for data loading and dbt for modeling makes using dlt-dbt a killer combination for data engineers and scientists, and my preferred choice. This method is especially effective for batched data and event-driven pipelines with small to medium workloads. For larger data loads nearing timeout limits, consider separating dlt and dbt into different cloud functions.

For more info on using dlt-dbt runner , please refer to the official documentation by clicking here.

Deployment considerations: How does cloud functions compare to Git Actions?

At dlthub we already natively support deploying to GitHub Actions, enabling you to have a serverless setup with a 1-command deployment.

GitHub actions is an orchestrator that most would not find suitable for a data warehouse setup - but it certainly could do the job for a minimalistic setup. GitHub actions provide 2000 free minutes per month, so if our pipelines run for 66 minutes per day, we fit in the free tier. If our pipelines took another 1h per day, we would need to pay ~15 USD/month for the smallest machine (2 vCPUs) but you can see how that would be expensive if we wanted to run it continuously or had multiple pipelines always-on in parallel.

Cloud functions are serverless lightweight computing solutions that can handle small computational workloads and are cost-effective. dbt doesn't require the high computing power of the machine because it uses the computing power of the data warehouse to perform the transformations. This makes running dbt-core on cloud functions a good choice. The free tier would suffice for about 1.5h per day of running a 1 vCPU and 2 GB RAM machine, and if we wanted an additional 1h per day for this hardware it would cost us around 3-5 USD/month.

DLT-DBT-RUNNER_IMAGE

When deploying dbt-core on cloud functions, there are certain constraints to keep in mind. For instance, there is a 9-minute time-out limit for all 1st Gen functions. For 2nd Gen functions, there is a 9-minute limit for event-driven functions and a 60-minute limit for HTTP functions. Since dbt works on the processing power of the data warehouse it's operating on, 60 minutes is sufficient for most cases with small to medium workloads. However, it is important to remember the 9-minute cap when using event-driven functions.

Conclusion

When creating lightweight pipelines, using the two tools together on one cloud function makes a lot of sense, simplifying the setup process and the handover between loading and transformation.

However, for more resource-intensive pipelines, we might want to improve resource utilisation by separating the dlt loading from the dbt running because while dbt’s run speed is determined by the database, dlt can utilize the cloud function’s hardware resources.

When it comes to setting up just a dbt package to run on cloud functions, I guess it comes to personal preference: I prefer dlt as it simplifies credential management. It automatically shares credentials with dbt, making setup easier. Streamlining the process further, dlt on Google Cloud functions, efficiently retrieves service account credentials, when none are provided. I also used dlt’s Slack error reporting function that sends success and error notifications from your runs directly to your Slack channel, helping me manage and monitor my runs.

· 5 min read
Rahul Joshi
info

TL;DR: While most companies continue to build their businesses on top of SAP, when it comes to analytics, they prefer to take advantage of the price and elastic compute of modern cloud infrastructure. As a consequence, we get several dlt users asking for a simple and low-cost way to migrate from SAP to cloud data warehouses like Snowflake. In this blog, I show how you can build a custom SAP connector with dlt and use it to load SAP HANA tables into Snowflake.

Blog image

In case you haven’t figured it out already, we at dltHub love creating blogs and demos. It’s fun, creative, and gives us a chance to play around with many new tools. We are able to do this mostly because, like any other modern tooling, dlt just fits in the modern ecosystem. Not only does dlt have existing integrations (to, for example, GCP, AWS, dbt, airflow etc.) that can simply be “plugged in”, but it is also very simple to customize it to integrate with almost any other modern tool (such as Metabase, Holistics, Dagster, Prefect etc.).

But what about enterprise systems like SAP? They are, after all, the most ubiquitous tooling out there: according to SAP data, 99 out of 100 largest companies are SAP customers. A huge part of the reason for this is that their ERP system is still the gold standard in terms of effectivity and reliability. However, when it comes to OLAP workloads like analytics, machine learning, predictive modelling etc., many companies prefer the convenience and cost savings of modern cloud solutions like GCP, AWS, Azure, etc..

So, wouldn’t it be nice to be able to integrate SAP into the modern ecosystem?

Unfortunately, this is not that simple. SAP does not integrate easily with non-SAP systems, and migrating data out from SAP is complicated and/or costly. This often means that ERP data stays separate from analytics data.

Creating a dlt integration

Our users have been asking for SAP HANA data, hence I decided to create a custom dlt integration to SAP’s in-memory data warehouse: SAP HANA. Given its SQL backend and Python API, I figured dlt should also have no problem connecting to it.

I then use this pipeline to load SAP HANA tables into Snowflake, since Snowflake is cloud agnostic and can be run in different environments (such AWS, GCP, Azure, or any combination of the three). This is how I did it:

Step 1: I created an instance in SAP HANA cloud.

(I used this helpful tutorial to navigate SAP HANA.)

SAP instance

Step 2: I inserted some sample data.
SAP insert data

Step 3: With tables created in SAP HANA, I was now ready to create a dlt pipeline to extract it into Snowflake:

Since SAP HANA has a SQL backend, I decided to extract the data using dlt’s SQL source

  1. I first created a dlt pipeline

    dlt init sql_database snowflake

  2. I then passed the connection string for my HANA instance inside the loading function in sql_database_pipeline.py. (Optional: I also specified the tables that I wanted to load in sql_database().with_resources("v_city", "v_hotel", "room") )

  3. Before running the pipeline I installed all necessary requirements using

    pip install -r requirements.txt

    The dependencies inside requirements.txt are for the general SQL source. To extract data specifically from HANA, I also installed the packages hdbcli and sqlalchemy-hana.

Step 4: I finally ran the pipeline using python sql_database_pipeline.py. This loaded the tables into Snowflake.

Data in Snowflake

Takeaway

The dlt SAP HANA connector constructed in this demo works like any other dlt connector, and is able to successfully load data from SAP HANA into data warehouses like Snowflake.

Furthermore, the demo only used a toy example, but the SQL source is a production-ready source with incremental loading, merges, data contracts etc., which means that this pipeline could also be configured for production use-cases.

Finally, the dlt-SAP integration has bigger consequences: it allows you to add other tools like dbt, airflow etc. easily into an SAP workflow, since all of these tools integrate well with dlt.

Next steps

This was a just first step into exploring what’s possible. Creating a custom dlt connector worked pretty well for SAP HANA, and there are several possible next steps, such as converting this to a verified source, or building other SAP connectors.

  1. Creating a verified source for SAP HANA: This should be pretty straight-forward since it would require a small modification of the existing SQL source.
  2. Creating a dlt connector for SAP S/4 HANA: S/4 HANA is SAP’s ERP software that runs on the HANA database. The use case would be to load ERP tables from S/4 HANA into other data warehouses like Snowflake. Depending on the requirements, there are two ways to go about it:
    1. Low volume data: This would again be straight-forward. SAP offers REST API end points to access ERP tables, and dlt is designed to be able to load data from any such end point.
    2. High volume data: dlt can also be configured for the use case of migrating large volumes of data with fast incremental or merge syncs. But this would require some additional steps, such as configuring the pipeline to access HANA backend directly from Python hdbcli.

· 8 min read
Aman Gupta

💡 This article explores methods for monitoring transactional events, allowing immediate action and data capture that might be lost otherwise. We focus on Github, Slack, and Hubspot, demonstrating techniques applicable to low-volume transactional events (under 500k/month) within the free tier. For clickstream tracking or higher volumes, we recommend more scalable solutions.

There’s more than one way to sync data. Pulling data after it has been collected from APIs is a classic way, but some types of data are better transmitted as an event at the time of happening. Our approach is event-triggered and can include actions like:

ApplicationAction
SlackSending messages in Slack
GithubCommit, comment, or PR actions
HubspotObject creation or meeting specific criteria

These actions initiate a webhook that sends a POST request to trigger a DLT pipeline for event ingestion. The data is then loaded into BigQuery.

pictorial_demonstration

This setup enables real-time alerts or event storage for later use. For example, let’s say you want to alert every time something happens - you’d want to be able to capture an event being sent to you and act on it. Or, in some cases, you store it for later use. This guide covers a use case for deploying and setting up webhooks.

Why do we use webhooks?

Whenever we want to receive an event from an external source, we need a “recipient address” to which they can send the data. To solve this problem, an effortless way is to use a URL as the address and accept a payload as data.

Why cloud functions?

The key reasons for using cloud functions include:

  1. To have a URL up and accept the data payload, we would need some service or API always to be up and ready to listen for the data.

  2. Creating our application for this would be cumbersome and expensive. It makes sense to use some serverless service for low volumes of events.

  3. On AWS, you would use API gateway + lambda to handle incoming events, but for GCP users, the option is more straightforward: Google Cloud functions come with an HTTP trigger, which enables you to create a URL and accept a payload.

  4. The pricing for cloud functions is unbeatable for low volumes: For ingesting an event with a minor function, assuming processing time to be a few seconds, we could invoke a few hundred thousand calls every month for free. For more pricing details, see the GCP pricing page for cloud functions.

Let's dive into the deployment of webhooks and app setup, focusing next on triggers from GitHub, Slack, and HubSpot for use cases discussed above.

1. GitHub Webhook

This GitHub webhook is triggered upon specified events such as pull requests (PRs), commits, or comments. It relays relevant data to BigQuery. Set up the GitHub webhook by creating the cloud function URL and configuring it in the GitHub repository settings.

1.1 Initialize GitHub webhook deployment

To set up the webhook, start by creating a cloud function. Follow these brief steps, and for an in-depth guide, please refer to the detailed documentation.

  1. Log into GCP and activate the Cloud Functions API.

  2. Click 'Create Function' in Cloud Functions, and select your region and environment setup.

  3. Choose HTTP as the trigger, enable 'Allow unauthenticated invocations', save, and click 'Next'.

  4. Set the environment to Python 3.10 and prepare to insert code into main.py:

    import dlt
    import time
    from google.cloud import bigquery
    from dlt.common import json

    def github_webhook(request):
    # Extract relevant data from the request payload
    data = request.get_json()

    Event = [data]

    pipeline = dlt.pipeline(
    pipeline_name='platform_to_bigquery',
    destination='bigquery',
    dataset_name='github_data',
    )

    pipeline.run(Event, table_name='webhook') #table_name can be customized
    return 'Event received and processed successfully.'
  5. Name the function entry point "github_webhook" and list required modules in requirements.txt.

    # requirements.txt
    dlt[bigquery]
  6. Post-deployment, a webhook URL is generated, typically following a specific format.

    https://{region]-{project-id}.cloudfunctions.net/{cloud-function-name}

Once the cloud function is configured, it provides a URL for GitHub webhooks to send POST requests, funneling data directly into BigQuery.

1.2 Configure the repository webhook in GitHub

Set up a GitHub repository webhook to trigger the cloud function on specified events by following these steps:

  1. Log into GitHub and go to your repository.
  2. Click "Settings" > "Webhooks" > "Add webhook."
  3. Enter the cloud function URL in "Payload URL."
  4. Choose "Content-Type" and select events to trigger the webhook, or select "Just send me everything."
  5. Click "Add webhook."

With these steps complete, any chosen events in the repository will push data to BigQuery, ready for analysis.

2. Slack Webhook

This Slack webhook fires when a user sends a message in a channel where the Slack app is installed. To set it up, set up a cloud function as below and obtain the URL, then configure the message events in Slack App settings.

2.1 Initialize Slack webhook deployment

Set up the webhook by creating a cloud function, using the same steps as for the GitHub webhook.

  1. Here’s what main.py looks like:

    import dlt
    from flask import jsonify

    def slack_webhook(request):
    # Handles webhook POST requests
    if request.method == 'POST':
    data = request.get_json()

    # Responds to Slack's verification challenge
    if 'challenge' in data:
    return jsonify({'challenge': data['challenge']})

    # Processes a message event
    if 'event' in data and 'channel' in data['event']:
    message_data = process_webhook_event(data['event'])

    # Configures and initiates a DLT pipeline
    pipeline = dlt.pipeline(
    pipeline_name='platform_to_bigquery',
    destination='bigquery',
    dataset_name='slack_data',
    )

    # Runs the pipeline with the processed event data
    pipeline.run([message_data], table_name='webhook')
    return 'Event processed.'
    else:
    return 'Event type not supported', 400
    else:
    return 'Only POST requests are accepted', 405

    def process_webhook_event(event_data):
    # Formats the event data for the DLT pipeline
    message_data = {
    'channel': event_data.get('channel'),
    'user': event_data.get('user'),
    'text': event_data.get('text'),
    'ts': event_data.get('ts'),
    # Potentially add more fields according to event_data structure
    }
    return message_data
  2. Name the entry point "slack_webhook" and include the necessary modules in requirements.txt, the same as the GitHub webhook setup.

  3. Once the cloud function is configured, you get a URL for Slack events to send POST requests, funneling data directly into BigQuery.

2.2 Set up and configure a Slack app

Create and install a Slack app in your workspace to link channel messages from Slack to BigQuery as follows:

  1. Go to "Manage apps" in workspace settings; click "Build" and "Create New App".
  2. Choose "from scratch", name the app, select the workspace, and create the app.
  3. Under "Features", select "Event Subscription", enable it, and input the Cloud Function URL.
  4. Add message.channels under "Subscribe to bot events".
  5. Save and integrate the app to the desired channel.

With these steps complete, any message sent on the channel will push data to BigQuery, ready for analysis.

3. Hubspot webhook

A Hubspot webhook can be configured within an automation workflow, applicable to contacts, companies, deals, tickets, quotes, conversations, feedback submissions, goals and invoices. It triggers upon specific conditions or data filters. To establish it, create a cloud function, retrieve its URL, and input this in Hubspot's automation workflow settings for message events.

3.1 Initialize Hubspot webhook deployment

Set up the webhook by creating a cloud function, using the same steps as for the GitHub webhook.

  1. Here’s what main.pylooks like:

    import dlt
    from flask import jsonify

    def hubspot_webhook(request):
    # Endpoint for handling webhook POST requests from Hubspot
    if request.method == 'POST':
    # Get JSON data from the POST request
    data = request.get_json()

    # Initialize and configure the DLT pipeline
    pipeline = dlt.pipeline(
    pipeline_name="hubspot",
    destination='bigquery', # Destination service for the data
    dataset_name='hubspot_webhooks_dataset', # BigQuery dataset name
    )

    # Execute the pipeline with the incoming data
    pipeline.run([data], table_name='hubspot_contact_events')

    # Return a success response
    return jsonify(message='HubSpot event processed.'), 200
    else:
    # Return an error response for non-POST requests
    return jsonify(error='Only POST requests are accepted'), 405

  2. Name the entry point "your_webhook" and include the necessary modules in requirements.txt, the same as the GitHub webhook setup.

  3. Once the cloud function is configured, you get a URL for Slack events to send POST requests, funneling data directly into BigQuery.

3.2 Configure a Hubspot automation workflow

To activate a Hubspot workflow with your webhook:

  1. Go to Hubspot: "Automation" > "Workflows" > "Create workflow".
  2. Start from scratch; choose "Company-based" for this example.
  3. Set "Object created" as the trigger.
  4. Add the "Send a webhook" action, use the "POST" method, and input your webhook URL.
  5. Select the company properties to include, test, and save.

This triggers the webhook upon new company creation, sending data to Bigquery via DLT.

In conclusion

Setting up a webhook is straightforward.

Using dlt with schema evolution, we can accept the events without worrying about their schema. However, for events with custom schemas or vulnerable to bad data quality or abuse, consider using dlt’s data contracts.

· 9 min read
Adrian Brudaru

In a recent article, Anna Geller, product manager at Kestra, highlighted why data ingestion will never be solved. In her article, she described the many obstacles around data ingestion, and detailed how various companies and open-source tools approached this problem.

I’m Adrian, data builder. Before starting dlthub, I was building data warehouses and teams for startups and corporations. Since I was such a power-builder, I have been looking for many years into how this space could be solved.

The conviction on which we started dlt is that, to solve the data ingestion problem, we need to identify the motivated problem solver and turbo charge them with the right tooling.

The current state of data ingestion: dependent on vendors or engineers.

When building a data pipeline, we can start from scratch, or we can look for existing solutions.

How can we build an ingestion pipeline?

  • SaaS tools: We could use ready-made pipelines or use building blocks to configure a new API call.
  • SDKs: We could ask a software developer to build a Singer or Airbyte source. Or we could learn object-oriented programming and the SDKs and become the software developer - but the latter is an unreasonable pathway for most.
  • Custom pipelines: We could ask a data engineer to build custom pipelines. Unfortunately, everyone is building from scratch, so we usually end up reinventing the flat tire. Pipelines often break and have a high maintenance effort, bottlenecking the amount that can be built and maintained per data engineer.

Besides the persona-tool fit, in the current tooling, there is a major trade-off between complexity. For example, SaaS tools or SaaS SDKs offer “building blocks” and leave little room for customizations. On the other hand, custom pipelines enable one to do anything they could want but come with a high burden of code, complexity, and maintenance. And classic SDKs are simply too difficult for the majority of data people.

etl_by_others.png

So how can we solve ingestion?

Ask first, who should solve ingestion. Afterwards, we can look into the right tools.

The builder persona should be invested in solving the problem, not into preserving it.

UI first? We already established that people dependent on a UI with building blocks are non-builders - they use what exists. They are part of the demand, not part of the solution.

SDK first? Further, having a community of software engineers for which the only reason to maintain pipelines is financial incentives also doesn’t work. For example, Singer has a large community of agencies that will help - for a price. But the open-source sources are not maintained, PRs are not accepted, etc. It’s just another indirect vendor community for whom the problem is desired.

The reasonable approach is to offer something to a person who wants to use the data but also has some capability to do something about it, and willingness to make an effort. So the problem has to be solved in code, and it logically follows that if we want the data person to use this without friction, it has to be Python.

So the existing tools are a dead end: What do custom pipeline builders do?

Unfortunately, the industry has very little standardization, but we can note some patterns.

df.to_sql() was a great first step

For the Python-first users, pandas df.to_sql() automated loading dataframes to SQL without having to worry about database-specific commands or APIs.

Unfortunately, this way of loading is limited and not very robust. There is no support for merge/upsert loading or for advanced configuration like performance hints. The automatic typing might sometimes also lead to issues over time with incremental loading.

Additionally, putting the data into a dataframe means loading it into memory, leading to limitations. So a data engineer considering how to create a boilerplate loading solution would not end up relying on this method because it would offer too little while taking away fine-grain control.

So while this method works well for quick and dirty work, it doesn’t work so well in production. And for a data engineer, this method adds little while taking away a lot. The good news: we can all use it; The bad news: it’s not engineering-ready.

Inserting JSON directly is a common antipattern. However, many developers use it because it solves a real problem.

Inserting JSON “as is” is a common antipattern in data loading. We do it because it’s a quick fix for compatibility issues between untyped semi-structured data and strongly typed databases. This enables us to just feed raw data to the analyst who can sort through it and clean it and curate it, which in turn enables the data team to not get bottlenecked at the data engineer.

So, inserting JSON is not all bad. It solves some real problems, but it has some unpleasant side effects:

  • Without an explicit schema, you do not know if there are schema changes in the data.
  • Without an explicit schema, you don’t know if your JSON extract path is unique. Many applications output inconsistent types, for example, a dictionary for a single record or a list of dicts for multiple records, causing JSON path inconsistencies.
  • Without an explicit schema, data discovery and exploration are harder, requiring more effort.
  • Reading a JSON record in a database usually scans the entire record, multiplying cost or degrading performance significantly.
  • Without types, you might incorrectly guess and suffer from frequent maintenance or incorrect parsing.
  • Dashboarding tools usually cannot handle nested data - but they often have options to model tabular data.

Boilerplate code vs one-offs

Companies who have the capacity will generally create some kind of common, boilerplate methods that enable their team to re-use the same glue code. This has major advantages but also disadvantages: building something like this in-house is hard, and the result is often a major cause of frustration for the users. What we usually see implemented is a solution to a problem, but is usually immature to be a nice technology and far from being a good product that people can use.

One-offs have their advantage: they are easy to create and can generally take a shortened path to loading data. However, as soon as you have more of them, you will want to have a single point of maintenance as above.

The solution: A pipeline-building dev tool for the Python layman

Let’s let Drake recap for us:

what would drake do

So what does our desired solution look like?

  • Usable by any Python user in any Python environment, like df.to_sql()
  • Automate difficult things: Normalize JSON into relational tables automatically. Alert schema changes or contract violations. Add robustness, scaling.
  • Keep code low: Declarative hints are better than imperative spaghetti.
  • Enable fine-grained control: Builders should be enabled to control finer aspects such as performance, cost, compliance.
  • Community: Builders should be enabled to share content that they create

We formulated our product principles and went from there.

And how far did we get?

  • dlt is usable by any Python user and has a very shallow learning curve.
  • dlt runs where Python runs: Cloud functions, notebooks, etc.
  • Automate difficult things: Dlt’s schema automations and extraction helpers do 80% of the pipeline work.
  • Keep code low: by automating a large chunk and offering declarative configuration, dlt keeps code as short as it can be.
  • Fine-grained control: Engineers with advanced requirements can easily fulfill them by using building blocks or custom code.
  • Community: We have a sharing mechanism (add a source to dlt’s sources) but it’s too complex for the target audience. There is a trade-off between the quality of code and strictness of requirements which we will continue exploring. We are also considering how LLMs can be used to assist with code quality and pipeline generation in the future.

What about automating the builder further?

LLMs are changing the world. They are particularly well-suited at language tasks. Here, a library shines over any other tool - simple code like you would write with dlt can automatically be written by GPT.

The same cannot be said for SDK code or UI tools: because they use abstractions like classes or configurations, they deviate much further from natural language, significantly increasing the complexity of using LLMs to generate for them.

LLMs aside, technology is advancing faster than our ability to build better interfaces - and a UI builder has been for years an obsolete choice. With the advent of self-documenting APIs following OpenAPI standard, there is no more need for a human to use a UI to compose building blocks - the entire code can be generated even without LLM assistance (demo of how we do it). An LLM could then possibly improve it from there. And if the APIs do not follow the standard, the building blocks of a UI builder are even less useful, while an LLM could read the docs and brute-force solutions.

So, will data ingestion ever be a fully solved problem? Yes, by you and us together.

In summary, data ingestion is a complex challenge that has seen various attempts at solutions, from SDKs to custom pipelines. The landscape is marked by trade-offs, with existing tools often lacking the perfect balance between simplicity and flexibility.

dlt, as a pipeline-building dev tool designed for Python users, aims to bridge this gap by offering an approachable, yet powerful solution. It enables users to automate complex tasks, keep their code concise, and maintain fine-grained control over their data pipelines. The community aspect is also a crucial part of the dlt vision, allowing builders to share their content and insights.

The journey toward solving data ingestion challenges is not just possible; it's promising, and it's one that data professionals together with dlt are uniquely equipped to undertake.

Resources:

· 6 min read
Adrian Brudaru
info

PSSSST! You do ELT, right? not ETL? asking for a friend...

ETL vs ELT? A vendor driven story.

One of the earliest tooling for "ETL" data was Pentaho Kettle. Kettle stands for "Kettle Extraction Transformation Transport Load Environment" and signifies that it transforms the data before loading it. It was usually used to load data which was later transformed in SQL via "SQL scripts", while still in the tool, or via database triggers or views outside of the tool.

Indeed, the tool creators imagined some folks would write java to transform before loading, but the vast majority of data users just wanted to use SQL.

Sounds familiar? This is not so different to today's "ELT", is it?

Why did we call it ELT?

The people

Well, first of all SQL is much more accessible and very powerful for transforming tables, columns and rows - where programming handles single values. So before purpose built tooling existed, data people were already doing the transform in SQL - it just made sense.

The "EL" vendors

In the decade following Pentaho, Saas solutions started offering pipelines that load data into your database, removing the option for you to tinker with it before loading. For this reason, they would call it "ELT".

The db vendors

The concept also resonated with MPP DBs (massive parallel processing), such as Snowflake, Redshift, Bigquery, which were more than happy to encourage doing all the compute on their side.

The "T in ELT" vendors

Another puzzle piece was dbt, a tool purpose built for SQL transform. So if there's a question of ETL or ELT, dbt can only answer ELT. In dbt's word view, data starts dirty in your warehouse, where you "rename, cast, join, enrich" - a true ELT. To make the drudgery of data cleaning in SQL easier, dbt offers some python support to enable generating some of the typing and renaming SQL. They also offer a litte bit of python support for scalar operations in some db vendor systems.

What do we really do?

Most of us do a little bit of both - we extract with python, and the next steps are loading, cleaning and curation. In some cases, cleaning and curation are optional. For example, when we load a report from another platform we will probably not need to clean or curate anything.

Where do we clean data?

Data cleaning usually refers to normalising the data into correct types, usable names, etc. Doing this in SQL results in writing a lot of manual code that needs to be maintained. On the other hand, sturcturing data in python isn't easy either, it's just less technically difficult, but when metadata is missing, it becomes guesswork.

So, technically the easier place to clean data is in python, but likely the majority will do it in SQL as they are more practiced in SQL.

Where do we transform data?

When it comes to working with tables, SQL is still the better place to be. Joins and aggregations are the core operations that will happen here and they would be much harder to handle scalably in python.

dlt puts the small t back in EtlT, let's see how.

So, python is still superior at a few operations

  • Typing, renaming, normalising, unpacking
  • complex scalar operations

While we will leave the aggregations and joins to the big T, SQL.

Normalisation, typing, unpacking

dlt does this well out of the box. Automatic typing, renaming, flattening, and ddl deployment are all handled by the schema inference and evolution engine. This engine is configurable in both how it works and what it does, you can read more here: Normaliser, schema settings

Here is a usage example (it's built into the pipeline):


import dlt

# Json, dataframes, iterables, all good
# the data will be auto typed and normalised
data = [{'id': 1, 'name': 'John'}]

# open connection
pipe = dlt.pipeline(destination='bigquery',
dataset_name='raw_data')

# self-explanatory declarative interface
job_status = pipe.run(data,
write_disposition="merge",
primary_key="id",
table_name="users")

# optionally load schema and metadata
pipe.run([job_status],
write_disposition="append",
table_name="loading_status")

Scalar operations

Sometimes we need to edit a column's value in some very specific way for which SQL doesn't quite cut it. Sometimes, we have data we need to pseudonymise before loading for regulatory reasons.

Because dlt is a library, it means you can easily change how the data stream is produced or ingested. Besides your own customisations, dlt also supports injecting your transform code inside the event stream, see an example here

Here is a code example of pseudonymisation, a common case where data needs to be transformed before loading:

import dlt
import hashlib

@dlt.source
def dummy_source(prefix: str = None):
@dlt.resource
def dummy_data():
for _ in range(3):
yield {'id':_, 'name': f'Jane Washington {_}'}
return dummy_data(),

def pseudonymize_name(doc):
'''
Pseudonmyisation is a deterministic type of PII-obscuring
Its role is to allow identifying users by their hash,
without revealing the underlying info.
'''
# add a constant salt to generate
salt = 'WI@N57%zZrmk#88c'
salted_string = doc['name'] + salt
sh = hashlib.sha256()
sh.update(salted_string.encode())
hashed_string = sh.digest().hex()
doc['name'] = hashed_string
return doc


# 1. Create an instance of the source so you can edit it.
data_source = dummy_source()
# 2. Modify this source instance's resource
data_resource = data_source.dummy_data().add_map(pseudonymize_name)
# 3. Inspect your result
for row in data_resource:
print(row)
#{'id': 0, 'name': '96259edb2b28b48bebce8278c550e99fbdc4a3fac8189e6b90f183ecff01c442'}
#{'id': 1, 'name': '92d3972b625cbd21f28782fb5c89552ce1aa09281892a2ab32aee8feeb3544a1'}
#{'id': 2, 'name': '443679926a7cff506a3b5d5d094dc7734861352b9e0791af5d39db5a7356d11a'}

pipeline = dlt.pipeline(pipeline_name='example', destination='bigquery', dataset_name='normalized_data')
load_info = pipeline.run(data_resource)

The big T

Finally, once you have clean data loaded, you will probably prefer to use SQL and one of the standard tools. dlt offers a dbt runner to get you started easily with your transformation package.

pipeline = dlt.pipeline(
pipeline_name='pipedrive',
destination='bigquery',
dataset_name='pipedrive_dbt'
)

# make or restore venv for dbt, using latest dbt version
venv = dlt.dbt.get_venv(pipeline)

# get runner, optionally pass the venv
dbt = dlt.dbt.package(
pipeline,
"pipedrive/dbt_pipedrive/pipedrive", # or use public git "https://github.com/dbt-labs/jaffle_shop.git"
venv=venv
)

# run the models and collect any info
# If running fails, the error will be raised with full stack trace
models = dbt.run_all()

#optionally log dbt status
pipeline.run([models],
write_disposition="append",
table_name="_models_log")

In conclusion

ETL vs ELT was never really a debate. With some exceptions almost everyone transforms the data in SQL - but what they call this process depends on who's telling the story.

While it's easier to do most of the transformation in SQL, the tedious is completely automatable in python, and the dirty data doesn't need manual normalisation. With dlt, you can do ETL or ELT, or even better, both, as EtLT

Or, if you're feeling funny, you can add duckdb in the middle and go full EtLTLT where you have an additional T step in the middle for the kinds of operations that could be done locally. And afterwards you could load to operational systems to add one more L to the name :)

Fundamentally, we all agree it's all ETL, with the flavors simply designating specific sub-types.

Start using dlt today

What are you waiting for?

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.