Postgres replication
Join our Slack community or Get in touch with the dltHub Customer Success team.
Postgres is one of the most popular relational database management systems. This verified source uses Postgres replication functionality to efficiently process tables (a process often referred to as Change Data Capture or CDC). It uses logical decoding and the standard built-in pgoutput output plugin.
Resources that can be loaded using this verified source are:
| Name | Description |
|---|---|
| replication_resource | Load published messages from a replication slot |
| init_replication | Initialize replication and optionally return snapshot resources for the initial data load |
The Postgres replication source currently does not support the scd2 merge strategy.
Setup guide
Set up user
To set up a Postgres user for replication, follow these steps:
-
Create a user with the
LOGINandREPLICATIONattributes:CREATE ROLE replication_user WITH LOGIN REPLICATION; -
Grant the
CREATEprivilege on the database:GRANT CREATE ON DATABASE dlt_data TO replication_user; -
Grant ownership of the tables you want to replicate:
ALTER TABLE your_table OWNER TO replication_user;
The minimum required privileges may differ depending on your replication configuration. For example, replicating entire schemas requires superuser privileges. Check the Sources and resources section for more detailed information.
Set up RDS
To set up a Postgres user on RDS, follow these steps:
-
Enable replication for your RDS Postgres instance via a Parameter Group.
-
WITH LOGIN REPLICATION;does not work on RDS; instead, do:GRANT rds_replication TO replication_user; -
Use the following connection parameters to enforce SSL:
sources.pg_replication.credentials="postgresql://loader:password@host.rds.amazonaws.com:5432/dlt_data?sslmode=require&connect_timeout=300"
Initialize the verified source
To get started with your data pipeline, follow these steps:
-
Run the following command:
dlt init pg_replication duckdbThis command initializes pipeline examples with Postgres replication as the source and DuckDB as the destination.
-
If you'd like to use a different destination, simply replace
duckdbwith the name of your preferred destination. For example:dlt init pg_replication bigquery -
After running the command, a new directory will be created with the necessary files and configuration settings to get started.
For more information, read the guide on how to add a verified source.
Add credentials
-
In the
.dltfolder, there's a file calledsecrets.toml. It's where you store sensitive information securely, like access tokens. Keep this file safe.Here's what the
secrets.tomllooks like:[sources.pg_replication.credentials]
drivername = "postgresql" # please set me up!
database = "database" # please set me up!
password = "password" # please set me up!
username = "username" # please set me up!
host = "host" # please set me up!
port = 0 # please set me up! -
Credentials can be set as shown above. Alternatively, you can provide credentials in the
secrets.tomlfile as follows:sources.pg_replication.credentials="postgresql://username@password.host:port/database" -
Finally, follow the instructions in the Destinations section to add credentials for your chosen destination.
For more information, read the Configuration section.
Run the pipeline
-
Ensure that you have installed all the necessary dependencies by running:
pip install -r requirements.txt -
After carrying out the necessary customization to your pipeline script, you can run the pipeline with the following command:
python pg_replication_pipeline.py -
Once the pipeline has finished running, you can verify that everything loaded correctly with:
dlt pipeline <pipeline_name> showFor example, the
pipeline_namefor the above pipeline example ispg_replication_pipeline, you may also use any custom name instead.For more information, read the guide on how to run a pipeline.
Sources and resources
Snapshot resources from init_replication
The init_replication function serves two main purposes:
- Sets up Postgres replication by creating the necessary replication slot and publication if they don't already exist.
- Optionally captures an initial snapshot when
persist_snapshots=Trueand returns snapshot resources for loading existing data.
def init_replication(
slot_name: str = dlt.config.value,
pub_name: str = dlt.config.value,
schema_name: str = dlt.config.value,
table_names: Optional[Union[str, Sequence[str]]] = dlt.config.value,
credentials: ConnectionStringCredentials = dlt.secrets.value,
publish: str = "insert, update, delete",
persist_snapshots: bool = False,
include_columns: Optional[Mapping[str, Sequence[str]]] = None,
columns: Optional[Mapping[str, TTableSchemaColumns]] = None,
reset: bool = False,
) -> Optional[Union[DltResource, List[DltResource]]]:
...
Depending on how you configure init_replication, the minimum required privileges for the Postgres user may differ:
| Configuration | Description | Minimum required privileges |
|---|---|---|
table_names=None | Replicates the entire schema. The publication includes all current and future tables in the schema. | Superuser |
table_names=[...]reset=Falsepersist_snapshots=False | Replicates specific tables. Creates or updates an existing publication/slot without dropping. No snapshot tables are created. | REPLICATION attribute, CREATE on the database if the publication does not yet exist, Publication ownership if the publication already exists, Table ownership (for each table) |
table_names=[...]reset=Falsepersist_snapshots=True | Replicates specific tables. Creates or updates an existing publication/slot without dropping. Snapshot tables are created for the initial load. | REPLICATION attribute, CREATE on the database if the publication does not yet exist, Publication ownership if the publication already exists, Table ownership (for each table), CREATE privilege in the schema (for snapshot tables) |
table_names=[...]reset=Truepersist_snapshots=False | Replicates specific tables. Drops existing publication/slot before recreating. No snapshot tables are created. | REPLICATION attribute, CREATE on the database, Table ownership (for each table), Slot/publication ownership if they already exist |
table_names=[...]reset=Truepersist_snapshots=True | Replicates specific tables. Drops existing publication/slot before recreating. Snapshot tables are created for the initial load. | REPLICATION attribute, CREATE on the database, Table ownership (for each table), Slot/publication ownership if they already exist, CREATE privilege in the schema (for snapshot tables) |
For detailed information about all arguments, see the source code.
Resource replication_resource
This resource yields data items for changes in one or more Postgres tables. It consumes messages from an existing replication slot and publication that must be set up beforehand (e.g., using init_replication).
@dlt.resource(
name=lambda args: args["slot_name"] + "_" + args["pub_name"],
)
def replication_resource(
slot_name: str,
pub_name: str,
credentials: ConnectionStringCredentials = dlt.secrets.value,
include_columns: Optional[Dict[str, Sequence[str]]] = None,
columns: Optional[Dict[str, TTableSchemaColumns]] = None,
target_batch_size: int = 1000,
flush_slot: bool = True,
) -> Iterable[Union[TDataItem, DataItemWithMeta]]:
...
The minimum required privileges for using replication_resource are straightforward:
- REPLICATION attribute (required for logical replication connections)
- Slot ownership (required to consume messages from the replication slot)
- Read access to publication metadata (to query the
pg_publicationsystem catalog)
For detailed information about the arguments, refer to the source code.
Customization
The pipeline examples include demos that simulate changes in a Postgres source to demonstrate replication. The simulation uses a simple pipeline defined as:
# Simulation pipeline
sim_pl = dlt.pipeline(
pipeline_name="simulation_pipeline",
destination="postgres",
dataset_name="source_dataset",
dev_mode=True,
)
This pipeline is configured in the get_postgres_pipeline() function.
It’s meant for local testing, so you can freely modify it to simulate different replication scenarios.
In production, you don’t need a simulation pipeline. Replication runs against an actual Postgres database that changes independently.
The general workflow for setting up replication is:
-
Define the replication pipeline that will load replicated data in your chosen destination:
repl_pl = dlt.pipeline(
pipeline_name="pg_replication_pipeline",
destination='duckdb',
dataset_name="replicate_single_table",
dev_mode=True,
) -
Initialize replication (if needed) with
init_replication, and capture a snapshot of the source:snapshot = init_replication(
slot_name="my_slot",
pub_name="my_pub",
schema_name="my_schema",
table_names="my_source_table",
persist_snapshots=True,
reset=True,
) -
Load the initial snapshot, so the destination contains all existing data before replication begins:
repl_pl.run(snapshot) -
Apply ongoing changes by creating a
replication_resourceto capture updates and keep the destination in sync:# Create a resource that generates items for each change in the source table
changes = replication_resource("my_slot", "my_pub")
repl_pl.run(changes)
Alternative: Using xmin for Change Data Capture (CDC)
If logical replication doesn't fit your needs, you can use the built-in xmin system column of Postgres for change tracking with dlt's sql_database source instead of the pg_replication source.
To do this, define a query_adapter_callback that extracts the xmin value from the source table and filters based on an incremental cursor:
def query_adapter_callback(query, table, incremental=None, _engine=None) -> sa.TextClause:
"""Generate a SQLAlchemy text clause for querying a table with optional incremental filtering."""
select_clause = (
f"SELECT {table.fullname}.*, xmin::text::bigint as xmin FROM {table.fullname}"
)
if incremental:
where_clause = (
f" WHERE {incremental.cursor_path}::text::bigint >= "
f"({incremental.start_value}::int8)"
)
return sa.text(select_clause + where_clause)
return sa.text(select_clause)
This approach enables you to track changes based on the xmin value instead of a manually defined column, which is especially useful in cases where mutation tracking is needed but a timestamp or serial column is not available.