Skip to main content
Version: 1.4.0 (latest)

Add incremental configuration to SQL resources

Incremental loading is the act of loading only new or changed data and not old records that have already been loaded. For example, a bank loads only the latest transactions, or a company updates its database with new or modified user information. In this article, we’ll discuss a few incremental loading strategies.

info

Processing data incrementally, or in batches, enhances efficiency, reduces costs, lowers latency, improves scalability, and optimizes resource utilization.

Incremental loading strategies

In this guide, we will discuss various incremental loading methods using dlt, specifically:

S.No.StrategyDescription
1.Full load (replace)It completely overwrites the existing data with the new/updated dataset.
2.Append new records based on Incremental IDAppends only new records to the table based on an incremental ID.
3.Append new records based on date ("created_at")Appends only new records to the table based on a date field.
4.Merge (Update/Insert) records based on timestamp ("last_modified_at") and IDMerges records based on a composite ID key and a timestamp field. Updates existing records and inserts new ones as necessary.

Code examples

1. Full load (replace)

A full load strategy completely overwrites the existing data with the new dataset. This is useful when you want to refresh the entire table with the latest data.

note

This strategy technically does not load only new data but instead reloads all data: old and new.

Here’s a walkthrough:

  1. The initial table, named "contact," in the SQL source looks like this:

    idnamecreated_at
    1Alice2024-07-01
    2Bob2024-07-02
  2. The Python code illustrates the process of loading data from an SQL source into BigQuery using the dlt pipeline. Please note the write_disposition = "replace" used below.

    def load_full_table_resource() -> None:
    """Load a full table, replacing existing data."""
    pipeline = dlt.pipeline(
    pipeline_name="mysql_database",
    destination='bigquery',
    dataset_name="dlt_contacts"
    )

    # Load the full table "contact"
    source = sql_database().with_resources("contact")

    # Run the pipeline
    info = pipeline.run(source, write_disposition="replace")

    # Print the info
    print(info)

    load_full_table_resource()
  3. After running the dlt pipeline, the data loaded into the BigQuery "contact" table looks like:

    Rowidnamecreated_at_dlt_load_id_dlt_id
    11Alice2024-07-011721878309.021546tgyMM73iMz0cQg
    22Bob2024-07-021721878309.02154688P0bD796pXo/Q
  4. Next, the "contact" table in the SQL source is updated—two new rows are added, and the row with id = 2 is removed. The updated data source ("contact" table) now presents itself as follows:

    idnamecreated_at
    1Alice2024-07-01
    3Charlie2024-07-03
    4Dave2024-07-04
  5. The "contact" table created in BigQuery after running the pipeline again:

    Rowidnamecreated_at_dlt_load_id_dlt_id
    11Alice2024-07-011721878309.021546S5ye6fMhYECZA
    23Charlie2024-07-031721878309.021546eT0zheRx9ONWuQ
    34Dave2024-07-041721878309.021546gtflF8BdL2NO/Q

What happened?

After running the pipeline, the original data in the "contact" table (Alice and Bob) is completely replaced with the new updated table with data “Charlie” and “Dave” added and “Bob” removed. This strategy is useful for scenarios where the entire dataset needs to be refreshed or replaced with the latest information.

2. Append new records based on incremental ID

This strategy appends only new records to the table based on an incremental ID. It is useful for scenarios where each new record has a unique, incrementing identifier.

Here’s a walkthrough:

  1. The initial table, named "contact," in the SQL source looks like this:

    idnamecreated_at
    1Alice2024-07-01
    2Bob2024-07-02
  2. The Python code demonstrates loading data from an SQL source into BigQuery using an incremental variable, id. This variable tracks new or updated records in the dlt pipeline. Please note the write_disposition = "append" used below.

    def load_incremental_id_table_resource() -> None:
    """Load a table incrementally based on an ID."""
    pipeline = dlt.pipeline(
    pipeline_name="mysql_database",
    destination='bigquery',
    dataset_name="dlt_contacts",
    )

    # Load table "contact" incrementally based on ID
    source = sql_database().with_resources("contact")
    source.contact.apply_hints(incremental=dlt.sources.incremental("id"))

    # Run the pipeline with append write disposition
    info = pipeline.run(source, write_disposition="append")

    # Print the info
    print(info)
  3. After running the dlt pipeline, the data loaded into the BigQuery "contact" table looks like:

    Rowidnamecreated_at_dlt_load_id_dlt_id
    11Alice2024-07-011721878309.021546YQfmAu8xysqWmA
    22Bob2024-07-021721878309.021546Vcb5KKah/RpmQw
  4. Next, the "contact" table in the SQL source is updated—two new rows are added, and the row with id = 2 is removed. The updated data source now presents itself as follows:

    idnamecreated_at
    1Alice2024-07-01
    3Charlie2024-07-03
    4Dave2024-07-04
  5. The "contact" table created in BigQuery after running the pipeline again:

    Rowidnamecreated_at_dlt_load_id_dlt_id
    11Alice2024-07-011721878309.021546OW9ZyAzkXg4D4w
    22Bob2024-07-021721878309.021546skVYZ/ppQuztUg
    33Charlie2024-07-031721878309.021546y+T4Q2JDnR33jg
    44Dave2024-07-041721878309.021546MAXrGhNNADXAiQ

What happened?

In this scenario, the pipeline appends new records (Charlie and Dave) to the existing data (Alice and Bob) without affecting the pre-existing entries. This strategy is ideal when only new data needs to be added, preserving the historical data.

Append new records based on timestamp ("created_at")

This strategy appends only new records to the table based on a date/timestamp field. It is useful for scenarios where records are created with a timestamp, and you want to load only those records created after a certain date.

Here’s a walkthrough:

  1. The initial dataset, named "contact," in the SQL source looks like this:

    idnamecreated_at
    1Alice2024-07-01 00:00:00
    2Bob2024-07-02 00:00:00
  2. The Python code illustrates the process of loading data from an SQL source into BigQuery using the dlt pipeline. Please note the write_disposition = "append", with created_at being used as the incremental parameter.

    def load_incremental_timestamp_table_resource() -> None:
    """Load a table incrementally based on created_at timestamp."""
    pipeline = dlt.pipeline(
    pipeline_name="mysql_databasecdc",
    destination='bigquery',
    dataset_name="dlt_contacts",
    )

    # Load table "contact", incrementally starting at a given timestamp
    source = sql_database().with_resources("contact")
    source.contact.apply_hints(incremental=dlt.sources.incremental(
    "created_at", initial_value=datetime.datetime(2024, 4, 1, 0, 0, 0)))

    # Run the pipeline
    info = pipeline.run(source, write_disposition="append")

    # Print the info
    print(info)

    load_incremental_timestamp_table_resource()
  3. After running the dlt pipeline, the data loaded into the BigQuery "contact" table looks like:

    Rowidnamecreated_at_dlt_load_id_dlt_id
    11Alice2024-07-01 00:00:00 UTC1721878309.0215465H8ca6C89umxHA
    22Bob2024-07-02 00:00:00 UTC1721878309.021546M61j4aOSqs4k2w
  4. Next, the "contact" table in the SQL source is updated—two new rows are added, and the row with id = 2 is removed. The updated data source now presents itself as follows:

    idnamecreated_at
    1Alice2024-07-01 00:00:00
    3Charlie2024-07-03 00:00:00
    4Dave2024-07-04 00:00:00
  5. The "contact" table created in BigQuery after running the pipeline again:

    Rowidnamecreated_at_dlt_load_id_dlt_id
    11Alice2024-07-01 00:00:00 UTC1721878309.021546Petj6R+B/63sWA
    22Bob2024-07-02 00:00:00 UTC1721878309.0215463Rr3VmY+av+Amw
    33Charlie2024-07-03 00:00:00 UTC1721878309.021546L/MnhG19xeMrvQ
    44Dave2024-07-04 00:00:00 UTC1721878309.021546W6ZdfvTzfRXlsA

What happened?

The pipeline adds new records (Charlie and Dave) that have a created_at timestamp after the specified initial value while retaining the existing data (Alice and Bob). This approach is useful for loading data incrementally based on when it was created.

4. Merge (update/insert) records based on timestamp ("last_modified_at") and ID

This strategy merges records based on a composite key of ID and a timestamp field. It updates existing records and inserts new ones as necessary.

Here’s a walkthrough:

  1. The initial dataset, named ‘contact’, in the SQL source looks like this:

    idnamelast_modified_at
    1Alice2024-07-01 00:00:00
    2Bob2024-07-02 00:00:00
  2. The Python code illustrates the process of loading data from an SQL source into BigQuery using the dlt pipeline. Please note the write_disposition = "merge", with last_modified_at being used as the incremental parameter.

    def load_merge_table_resource() -> None:
    """Merge (update/insert) records based on last_modified_at timestamp and ID."""
    pipeline = dlt.pipeline(
    pipeline_name="mysql_database",
    destination='bigquery',
    dataset_name="dlt_contacts",
    )

    # Merge records, 'contact' table, based on ID and last_modified_at timestamp
    source = sql_database().with_resources("contact")
    source.contact.apply_hints(incremental=dlt.sources.incremental(
    "last_modified_at", initial_value=datetime.datetime(2024, 4, 1, 0, 0, 0)),
    primary_key="id")

    # Run the pipeline
    info = pipeline.run(source, write_disposition="merge")

    # Print the info
    print(info)

    load_merge_table_resource()
  3. After running the dlt pipeline, the data loaded into BigQuery ‘contact’ table looks like:

    Rowidnamelast_modified_at_dlt_load_id_dlt_id
    11Alice2024-07-01 00:00:00 UTC1721878309.021546ObbVlxcly3VknQ
    22Bob2024-07-02 00:00:00 UTC1721878309.021546Vrlkus/haaKlEg
  4. Next, the "contact" table in the SQL source is updated— “Alice” is updated to “Alice Updated”, and a new row “Hank” is added:

    idnamelast_modified_at
    1Alice Updated2024-07-08 00:00:00
    3Hank2024-07-08 00:00:00
  5. The "contact" table created in BigQuery after running the pipeline again:

    Rowidnamelast_modified_at_dlt_load_id_dlt_id
    12Bob2024-07-02 00:00:00 UTC1721878309.021546Cm+AcDZLqXSDHQ
    21Alice Updated2024-07-08 00:00:00 UTC1721878309.021546OeMLIPw7rwFG7g
    33Hank2024-07-08 00:00:00 UTC1721878309.021546Ttp6AI2JxqffpA

What happened?

The pipeline updates the record for Alice with the new data, including the updated last_modified_at timestamp, and adds a new record for Hank. This method is beneficial when you need to ensure that records are both updated and inserted based on a specific timestamp and ID.

The examples provided explain how to use dlt to achieve different incremental loading scenarios, highlighting the changes before and after running each pipeline.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.