Skip to main content
Version: 1.7.0 (latest)

Source: MS SQL replication

dlt+

This page is for dlt+, which requires a license. Join our early access program for a trial license.

MS SQL replication

dlt+ provides a comprehensive solution for syncing an MS SQL Server table using Change Tracking, a solution similar to CDC. By leveraging SQL Server's native Change Tracking feature, you can efficiently load incremental data changes — including inserts, updates, and deletes — into your destination.

Prerequisites

Before you begin, ensure that Change Tracking is enabled on both your database and the tables you wish to track, as it is a feature that must be explicitly activated.

Enable Change Tracking on the database

Run the following SQL command to enable Change Tracking on your database:

ALTER DATABASE [YourDatabaseName]
SET CHANGE_TRACKING = ON
(CHANGE_RETENTION = 7 DAYS, AUTO_CLEANUP = ON);
  • [YourDatabaseName]: Replace with the name of your database.
  • CHANGE_RETENTION: Specifies how long Change Tracking information is retained. In this example, it’s set to 7 days.
  • AUTO_CLEANUP: When set to ON, Change Tracking information older than the retention period is automatically removed.

Enable Change Tracking on the table

For each table you want to track, execute:

ALTER TABLE [YourSchemaName].[YourTableName]
ENABLE CHANGE_TRACKING
WITH (TRACK_COLUMNS_UPDATED = ON);
  • [YourSchemaName].[YourTableName]: Replace with your schema and table names.
  • TRACK_COLUMNS_UPDATED: When set to ON, allows you to see which columns were updated in a row. Set to OFF if you don’t need this level of detail.

Set up dlt+ and drivers

Setting up the pipeline

The process involves two main steps:

  1. Initial full load: Use the sql_table function to perform a full backfill of your table data.
  2. Incremental loading: Use the create_change_tracking_table function to load incremental changes using SQL Server's Change Tracking.

This approach ensures that you have a complete dataset from the initial load and efficiently keep it updated with subsequent changes.

Initial full load

Get the Change Tracking version before you execute the initial load to make sure you do not miss any updates that may happen during it. This may result in "replaying" a few changes that happen during the load, but this will not have any impact on the destination data due to the merge write disposition.

from dlt_plus.sources.mssql import get_current_change_tracking_version
from sqlalchemy import create_engine

connection_url = "mssql+pyodbc://username:password@your_server:port/YourDatabaseName?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"

engine = create_engine(connection_url)

tracking_version = get_current_change_tracking_version(engine)

To fully avoid any duplication, you may completely lock the table during the initial load.

Now you can use the sql_table resource to perform the initial backfill:

import dlt
from dlt.sources.sql_database import sql_table

# Initial full load
initial_resource = sql_table(
credentials=engine,
schema=schema_name,
table=table_name,
reflection_level="full",
write_disposition="merge",
)

pipeline = dlt.pipeline(
pipeline_name='sql_server_sync_pipeline',
destination='your_destination',
dataset_name='destination_dataset',
)

# Run the pipeline for initial load
pipeline.run(initial_resource)

Next, configure the incremental resource for the first run with the create_change_tracking_table function and run it once:

from dlt_plus.sources.mssql import create_change_tracking_table

# Optional: Configure engine isolation level
# use it if you create an Engine implicitly
def configure_engine_isolation_level(engine):
return engine.execution_options(isolation_level="SERIALIZABLE")

incremental_resource = create_change_tracking_table(
credentials=engine,
table=table_name,
schema=schema_name,
initial_tracking_version=tracking_version,
engine_adapter_callback=configure_engine_isolation_level,
)

pipeline.run(incremental_resource)

When running for the first time, it is necessary to pass the tracking_version in the initial_tracking_version argument. This will initialize incremental loading and keep the updated tracking version in the dlt state. In subsequent runs, you do not need to provide the initial value anymore.

Incremental loading

After the initial load, you can run the create_change_tracking_table resource on a schedule to load only the changes since the last tracking version using SQL Server’s CHANGETABLE function. You do not need to pass initial_tracking_version anymore, since this is automatically stored in the dlt state.

from dlt_plus.sources.mssql import create_change_tracking_table

incremental_resource = create_change_tracking_table(
credentials=engine,
table=table_name,
schema=schema_name,
)
pipeline.run(incremental_resource)
note

The write_disposition is by default set to merge, which handles upserts based on primary keys. This determines the behavior when new data is loaded, especially regarding duplicates, updates, and deletes.

Full code example

Show full code example
import dlt

from sqlalchemy import create_engine

from dlt.sources.sql_database import sql_table
from dlt_plus.sources.mssql import (
create_change_tracking_table,
get_current_change_tracking_version,
)


def single_table_initial_load(connection_url: str, schema_name: str, table_name: str) -> None:
"""Performs an initial full load and sets up tracking version and incremental loads"""
# Create a new pipeline
pipeline = dlt.pipeline(
pipeline_name=f"{schema_name}_{table_name}_sync",
destination="duckdb",
dataset_name=schema_name,
)

# Explicit database connection
engine = create_engine(connection_url, isolation_level="SNAPSHOT")

# Initial full load
initial_resource = sql_table(
credentials=engine,
schema=schema_name,
table=table_name,
reflection_level="full",
write_disposition="merge",
)

# Get the current tracking version before you run the pipeline to make sure
# you do not miss any records
tracking_version = get_current_change_tracking_version(engine)
print(f"will track from: {tracking_version}") # noqa

# Run the pipeline for the initial load
# NOTE: we always drop data and state from the destination on initial load
print(pipeline.run(initial_resource, refresh="drop_resources")) # noqa

# Incremental loading resource
incremental_resource = create_change_tracking_table(
credentials=engine,
table=table_name,
schema=schema_name,
initial_tracking_version=tracking_version,
)

# Run the pipeline for incremental load
print(pipeline.run(incremental_resource)) # noqa


def single_table_incremental_load(connection_url: str, schema_name: str, table_name: str) -> None:
"""Continues loading incrementally"""
# Make sure you use the same pipeline and dataset names in order to continue incremental
# loading.
pipeline = dlt.pipeline(
pipeline_name=f"{schema_name}_{table_name}_sync",
destination="duckdb",
dataset_name=schema_name,
)

engine = create_engine(connection_url, isolation_level="SNAPSHOT")
# We do not need to pass the tracking version anymore
incremental_resource = create_change_tracking_table(
credentials=engine,
table=table_name,
schema=schema_name,
)
print(pipeline.run(incremental_resource)) # noqa


if __name__ == "__main__":
# Change Tracking already enabled here
test_db = "my_database83ed099d2d98a3ccfa4beae006eea44c"
# A test run with a local mssql instance
connection_url = (
f"mssql+pyodbc://sa:Strong%21Passw0rd@localhost:1433/{test_db}"
"?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"
)
single_table_initial_load(
connection_url,
"my_dlt_source",
"app_user",
)
single_table_incremental_load(
connection_url,
"my_dlt_source",
"app_user",
)

Understanding the change tracking query

The incremental loading process uses a SQL query that joins the CHANGETABLE function with the source table to fetch the latest changes. Here’s a simplified version of the query:

SELECT
[Columns],
ct.SYS_CHANGE_VERSION AS _dlt_sys_change_version,
CASE WHEN ct.SYS_CHANGE_OPERATION = 'D' THEN 'DELETE' ELSE NULL END AS _dlt_deleted
FROM
CHANGETABLE(CHANGES [YourSchemaName].[YourTableName], @last_version) AS ct
LEFT JOIN [YourSchemaName].[YourTableName] AS t
ON ct.[PrimaryKey] = t.[PrimaryKey]
WHERE
ct.SYS_CHANGE_VERSION > @last_version
ORDER BY
ct.SYS_CHANGE_VERSION ASC
  • CHANGETABLE: Retrieves changes for the specified table since the last tracking version.
  • Join with source table: The join retrieves the current data for the changed rows.
  • SYS_CHANGE_VERSION: Used to track and order changes.
  • _dlt_deleted: Indicates if a row was deleted.
note

Since the query joins with the production table, there may be implications for locking and performance. Ensure your database can handle the additional load, and consider isolation levels if necessary.

Full refresh

danger

Doing a full refresh will drop the destination table, i.e., delete data from the destination, and reset the state holding the tracking version.

You can trigger a full refresh by performing a full load again and passing drop_resources to the run method (as described in the pipeline configuration):

pipeline.run(initial_resource, refresh="drop_resources")

Handling deletes

There is an optional parameter that can be passed to create_change_tracking_table for configuring how to handle deletes:

from dlt_plus.sources.mssql import create_change_tracking_table

incremental_resource = create_change_tracking_table(
credentials=engine,
table=table_name,
schema=schema_name,
hard_delete=True,
)
pipeline.run(incremental_resource)

Hard deletes

By default, hard_delete is set to True, meaning hard deletes are performed, i.e., rows deleted in the source will be permanently removed from the destination.

Replicated data allows for NULLs for not nullable columns when a record is deleted. To avoid additional tables that hold deleted rows and additional merge steps, dlt emits placeholder values that are stored in the staging dataset only.

Soft deletes

If hard_delete is set to False, soft deletes are performed, i.e., rows deleted in the source will be marked as deleted but not physically removed from the destination.

In this case, the destination schema must accept NULLs for the replicated columns, so make sure you pass the remove_nullability_adapter adapter to the sql_table resource:

from dlt_plus.sources.mssql import remove_nullability_adapter

table = sql_table(
table_adapter_callback=remove_nullability_adapter,
)

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.