dltHub
Blog /

Data Lineage using dlt and dbt.

  • Zaeem Athar,
    Jr. Data Engineer

Why data lineage is important?

Data lineage is an important tool in an arsenal of a data engineer. It showcases the journey of data from its source to its destination. It captures all the pitstops made and can help identify issues in the data pipelines by offering a birds eye view of the data.

As data engineers, data lineage enables us to trace and troubleshoot the datapoints we offer to our stakeholders. It is also an important tool that can be used to meet regulation regarding privacy. Moreover, it can help us evaluate how any changes upstream in a pipeline effects the downstream source. There are many types of data lineage, the most commonly used types are the following:

  • Table lineage, it shows the raw data sources that are used to form a new table. It tracks the flow of data, showing how data moves forward through various processes and transformations.
  • Row lineage reveals the data flow at a more granular level. It refers to tracking and understanding of individual rows of data as they move through various stages in a data processing pipeline. It is a subset of table lineage that focuses specifically on the journey of individual records or rows rather than the entire dataset.
  • Column lineage specifically focuses on tracking and documenting the flow and transformation of individual columns or fields within different tables and views in the data.

Project Overview

In this demo, we showcase how you can leverage the dlt pipeline load_info to create table, row, and column lineage for your data. The code for the demo is available on GitHub.

The dlt load_info encapsulates useful information pertaining to the loaded data. It contains the pipeline, dataset name, destination information, and list of loaded packages among other elements. Within the load_info packages, you will find a list of all tables and columns created at the destination during loading of the data. It can be used to display all the schema changes that occur during data ingestion and implement data lineage.

We will work with the example of a skate shop that runs an online shop using Shopify, in addition to its physical stores. The data from both sources is extracted using dlt and loaded into BigQuery.

In order to run analytics workloads, we will create a transformed fact_sales table using dbt and the extracted raw data. The fact_sales table can be used to answer all the sales related queries for the business.

The load_info produced by dlt for both pipelines is also populated into BigQuery. We will use this information to create a Dashboard in Metabase that shows the data lineage for the fact_sales table.

Implementing Data Lineage

To get started install dlt and dbt:

pip install dlt
pip install dbt-bigquery

As we will be ingesting data into BigQuery, we first need to create service account credentials for BigQuery. You can find more info on setting up a service account in the dlt docs.

We use the following CSV files as our data sources for this demo:

dlt provides verified Shopify source to directly extract data from the Shopify API.

Step 1: Initialize a dlt pipeline

To get started we initialize a dlt pipeline and select BigQuery as our destination by running the following command:

dlt init data_lineage bigquery

This will create default scaffolding to build our pipeline. Install the dependencies by running the following command:

pip install -r requirements.txt

Loading the data

As a first step, we will load the sales data from the online and physical store of the skate shop into BigQuery. In addition to the sales data, we will also ingest the dlt load_info into BigQuery. This will help us track changes in our pipeline.

Step 2: Adding the dlt pipeline code

In the data_lineage.py file remove the default code and add the following:

FILEPATH = "data/supermarket_sales.csv"
FILEPATH_SHOPIFY = "data/orders_export_1.csv"

class Data_Pipeline:
    def __init__(self, pipeline_name, destination, dataset_name):
        self.pipeline_name = pipeline_name
        self.destination = destination
        self.dataset_name = dataset_name

    def run_pipeline(self, data, table_name, write_disposition):
        # Configure the pipeline with your destination details
        pipeline = dlt.pipeline(
            pipeline_name=self.pipeline_name,
            destination=self.destination,
            dataset_name=self.dataset_name
        )
        # Run the pipeline with the provided data
        load_info = pipeline.run(
            data,
            table_name=table_name,
            write_disposition=write_disposition
        )

        # Pretty print the information on data that was loaded
        print(load_info)
        return load_info

Any changes in the underlying data are captured by the dlt load_info. To showcase this, we will filter the data to remove the Branch and Tags columns from Store and Shopify data respectively, and run the pipeline. Later we will add back the columns and rerun the pipeline. These new columns added will be recorded in the load_info packages.

We will add the load_info back to BigQuery to use in our Dashboard. The Dashboard will provide an overview of data lineage for our ingested data.

if __name__ == "__main__":

    data_store = pd.read_csv(FILEPATH)
    data_shopify = pd.read_csv(FILEPATH_SHOPIFY)

    #filtering some data.
    select_c_data_store = data_store.loc[
        :, data_store.columns.difference(['Branch'])
    ]
    select_c_data_shopify = data_shopify.loc[
        :, data_shopify.columns.difference(['Tags'])
    ]

    pipeline_store = Data_Pipeline(
        pipeline_name='pipeline_store',
        destination='bigquery',
        dataset_name='sales_store'
    )
    pipeline_shopify = Data_Pipeline(
        pipeline_name='pipeline_shopify',
        destination='bigquery',
        dataset_name='sales_shopify'
    )

    load_a = pipeline_store.run_pipeline(
        data=select_c_data_store,
        table_name='sales_info',
        write_disposition='replace'
    )
    load_b = pipeline_shopify.run_pipeline(
        data=select_c_data_shopify,
        table_name='sales_info',
        write_disposition='replace'
    )

    pipeline_store.run_pipeline(
        data=load_a.load_packages,
        table_name="load_info",
        write_disposition="append"
    )
    pipeline_shopify.run_pipeline(
        data=load_b.load_packages,
        table_name='load_info',
        write_disposition="append"
    )

Step 3: Run the dlt pipeline

To run the pipeline, execute the following command:

python data_lineage.py

This will load the data into BigQuery. We now need to remove the column filters from the code and rerun the pipeline. This will add the filtered columns to the tables in BigQuery. The change will be captured by dlt.

Data Transformation and Lineage

Now that both the Shopify and Store data are available in BigQuery, we will use dbt to transform the data.

Step 4: Initialize a dbt project and define a model

To get started initialize a dbt project in the root directory:

dbt init sales_dbt

Next, in sales_dbt/models we define the dbt models. The first model will be the fact_sales.sql. The skate shop has two data sources: the online Shopify source and the physical Store source. We need to combine the data from both sources to create a unified reporting feed. The fact_sales table will be our unified source.

Code for fact_sales.sql:

{{ config(materialized='table') }}

select
  invoice_id,
  city,
  unit_price,
  quantity,
  total,
  date,
  payment,
  info._dlt_id,
  info._dlt_load_id,
  loads.schema_name,
  loads.inserted_at
from {{source('store', 'sales_info')}} as info
left join {{source('store', '_dlt_loads')}} as loads
on  info._dlt_load_id = loads.load_id

union all

select
  name as invoice_id,
  billing_city,
  lineitem_price,
  lineitem_quantity,
  total,
  created_at,
  payment_method,
  info._dlt_id,
  info._dlt_load_id,
  loads.schema_name,
  loads.inserted_at
from {{source('shopify', 'sales_info')}} as info
left join {{source('shopify', '_dlt_loads')}} as loads
on  info._dlt_load_id = loads.load_id
where financial_status = 'paid'

In the query, we join the sales information for each source with its dlt load_info. This will help us keep track of the number of rows added with each pipeline run. This schema_name identifies the source that populated the table and helps establish the table lineage. While the _dlt_load_id identifies the pipeline run that populated each row and helps establish row-level lineage. The sources are combined to create a fact_sales table by doing a union over both sources.

Next, we define the schema_change.sql model to capture the changes in the table schema using the following query:

{{ config(materialized='table') }}

select *
from {{source('store', 'load_info__tables__columns')}}

union all

select *
from {{source('shopify', 'load_info__tables__columns')}}

In the query, we combine the load_info for both sources by doing a union over the sources. The resulting schema_change table contains records of the column changes that occur on each pipeline run. This will help us track the column lineage and will be used to create our Data Lineage Dashboard.

Step 5: Run the dbt package

In the data_lineage.py add the code to run the dbt package using dlt.

pipeline_transform = dlt.pipeline(
    pipeline_name='pipeline_transform',
    destination='bigquery',
    dataset_name='sales_transform'
)

venv = Venv.restore_current()
here = os.path.dirname(os.path.realpath(__file__))

dbt = dlt.dbt.package(
    pipeline_transform,
    os.path.join(here, "sales_dbt/"),
    venv=venv
)

models = dbt.run_all()

for m in models:
    print(
        f"Model {m.model_name} materialized in {m.time} - "
        f"Status {m.status} and message {m.message}"
    )

Next, run the pipeline using the following command:

python data_lineage.py

Once the pipeline is run, a new dataset called sales_transform will be created in BigQuery, which will contain the fact_sales and schema_changes tables that we defined in the dbt package.

Step 6: Visualising the lineage in Metabase

To access the BigQuery data in Metabase, we need to connect BigQuery to Metabase. Follow the Metabase docs to connect BigQuery to Metabase.

Once BigQuery is connected with Metabase, use the SQL Editor to create the first table. The Data Load Overview table gives an overview of the dlt pipelines that populated the fact_sales table. It shows the pipeline names and the number of rows loaded into the fact_sales table by each pipeline.


This can be used to track the rows loaded by each pipeline. An upper and lower threshold can be set, and when our pipelines add rows above or below the threshold, that can act as our canary in the coal mine.

Next, we will visualize the fact_sales and the schema_changes as a table and add them dlt_load_id as a filter. The resulting Data Lineage Dashboard will give us an overview of the table, row, and column-level lineage for our data.

When we filter by the dlt_load_id the dashboard will filter for the specific pipeline run. In the Fact Sales table, the column schema_name identifies the raw sources that populated the table (Table lineage). The table also shows only the rows that were added for the pipeline run (Row Lineage). Lastly, the Updated Columns table reveals the columns that were added for the filtered pipeline run (Column Lineage).

When we ran the pipeline initially, we filtered out the Tags column and later reintroduced it and ran the pipeline again. The Updated Columns show that the Tags column was added to the Fact Sales table with the new pipeline run.

Conclusion

Data lineage provides an overview of the data journey from the source to the destination. It is an important tool that can help troubleshoot a pipeline. dlt load_info provides an alternative solution to visualizing data lineage by tracking changes in the underlying data.

Although dlt currently does not support data flow diagrams, it tracks changes in the data schema that can be used to create dashboards that provide an overview of table, row, and column lineage for the loaded data.