Postgres
Install dlt with PostgreSQLโ
To install the dlt library with PostgreSQL dependencies, run:
pip install "dlt[postgres]"
Setup guideโ
1. Initialize a project with a pipeline that loads to Postgres by running:
dlt init chess postgres
2. Install the necessary dependencies for Postgres by running:
pip install -r requirements.txt
This will install dlt with the postgres
extra, which contains the psycopg2
client.
3. After setting up a Postgres instance and psql
or a query editor, create a new database by running:
CREATE DATABASE dlt_data;
Add the dlt_data
database to .dlt/secrets.toml
.
4. Create a new user by running:
CREATE USER loader WITH PASSWORD '<password>';
Add the loader
user and <password>
password to .dlt/secrets.toml
.
5. Give the loader
user owner permissions by running:
ALTER DATABASE dlt_data OWNER TO loader;
You can set more restrictive permissions (e.g., give user access to a specific schema).
6. Enter your credentials into .dlt/secrets.toml
.
It should now look like this:
[destination.postgres.credentials]
database = "dlt_data"
username = "loader"
password = "<password>" # replace with your password
host = "localhost" # or the IP address location of your database
port = 5432
connect_timeout = 15
You can also pass a database connection string similar to the one used by the psycopg2
library or SQLAlchemy. The credentials above will look like this:
# Keep it at the top of your TOML file, before any section starts
destination.postgres.credentials="postgresql://loader:<password>@localhost/dlt_data?connect_timeout=15"
To pass credentials directly, use the explicit instance of the destination
pipeline = dlt.pipeline(
pipeline_name='chess',
destination=dlt.destinations.postgres("postgresql://loader:<password>@localhost/dlt_data"),
dataset_name='chess_data'
)
Write dispositionโ
All write dispositions are supported.
If you set the replace
strategy to staging-optimized
, the destination tables will be dropped and replaced by the staging tables.
Data loadingโ
dlt
will load data using large INSERT VALUES statements by default. Loading is multithreaded (20 threads by default).
Data typesโ
postgres
supports various timestamp types, which can be configured using the column flags timezone
and precision
in the dlt.resource
decorator or the pipeline.run
method.
- Precision: allows you to specify the number of decimal places for fractional seconds, ranging from 0 to 6. It can be used in combination with the
timezone
flag. - Timezone:
- Setting
timezone=False
maps toTIMESTAMP WITHOUT TIME ZONE
. - Setting
timezone=True
(or omitting the flag, which defaults toTrue
) maps toTIMESTAMP WITH TIME ZONE
.
- Setting
Example precision and timezone: TIMESTAMP (3) WITHOUT TIME ZONEโ
@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "precision": 3, "timezone": False}},
primary_key="event_id",
)
def events():
yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123"}]
pipeline = dlt.pipeline(destination="postgres")
pipeline.run(events())
Fast loading with Arrow tables and CSVโ
You can use Arrow tables and CSV to quickly load tabular data. Pick the CSV loader file format like below:
info = pipeline.run(arrow_table, loader_file_format="csv")
In the example above, arrow_table
will be converted to CSV with pyarrow and then streamed into postgres with the COPY command. This method skips the regular dlt
normalizer used for Python objects and is several times faster.
Supported file formatsโ
- insert-values is used by default.
- CSV is supported.
Supported column hintsโ
postgres
will create unique indexes for all columns with unique
hints. This behavior may be disabled.
Spatial Typesโ
To enable GIS capabilities in your Postgres destination, use the x-postgres-geometry
and x-postgres-srid
hints for columns containing geometric data.
The postgres_adapter
facilitates applying these hints conveniently, with a default SRID of 4326
.
Supported Geometry Types:
- WKT (Well-Known Text)
- Hex Representation
If you have geometry data in binary format, you will need to convert it to hexadecimal representation before loading.
Example: Using postgres_adapter
with Different Geometry Types
from dlt.destinations.impl.postgres.postgres_adapter import postgres_adapter
# Sample data with various geometry types
data_wkt = [
{"type": "Point_wkt", "geom": "POINT (1 1)"},
{"type": "Point_wkt", "geom": "Polygon([(0, 0), (1, 0), (1, 1), (0, 1), (0, 0)])"},
]
data_wkb_hex = [
{"type": "Point_wkb_hex", "geom": "0101000000000000000000F03F000000000000F03F"},
{"type": "Point_wkb_hex", "geom": "01020000000300000000000000000000000000000000000000000000000000F03F000000000000F03F00000000000000400000000000000040"},
]
# Apply postgres_adapter to the 'geom' column with default SRID 4326
resource_wkt = postgres_adapter(data_wkt, geometry="geom")
resource_wkb_hex = postgres_adapter(data_wkb_hex, geometry="geom")
# If you need a different SRID
resource_wkt = postgres_adapter(data_wkt, geometry="geom", srid=3242)
Ensure that the PostGIS extension is enabled in your Postgres database:
CREATE EXTENSION postgis;
This configuration allows dlt
to map the geom
column to the PostGIS geometry
type for spatial queries and analyses.
LinearRing
geometry type isn't supported.
Table and column identifiersโ
Postgres supports both case-sensitive and case-insensitive identifiers. All unquoted and lowercase identifiers resolve case-insensitively in SQL statements. Case insensitive naming conventions like the default snake_case will generate case-insensitive identifiers. Case sensitive (like sql_cs_v1) will generate case-sensitive identifiers that must be quoted in SQL statements.
Additional destination optionsโ
The Postgres destination creates UNIQUE indexes by default on columns with the unique
hint (i.e., _dlt_id
). To disable this behavior:
[destination.postgres]
create_indexes=false
Setting up CSV formatโ
You can provide non-default CSV settings via a configuration file or explicitly.
[destination.postgres.csv_format]
delimiter="|"
include_header=false
or
from dlt.destinations import postgres
from dlt.common.data_writers.configuration import CsvFormatConfiguration
csv_format = CsvFormatConfiguration(delimiter="|", include_header=False)
dest_ = postgres(csv_format=csv_format)
Above, we set the CSV
file without a header, with | as a separator.
You'll need those settings when importing external files.
dbt supportโ
This destination integrates with dbt via dbt-postgres.
Syncing of dlt stateโ
This destination fully supports dlt state sync.
Additional Setup guidesโ
- Load data from Sentry to Azure Cosmos DB in python with dlt
- Load data from Capsule CRM to Timescale in python with dlt
- Load data from The Local Filesystem to Azure Cosmos DB in python with dlt
- Load data from Pipedrive to Timescale in python with dlt
- Load data from Bitbucket to EDB BigAnimal in python with dlt
- Load data from X to AlloyDB in python with dlt
- Load data from Pinterest to Supabase in python with dlt
- Load data from Looker to EDB BigAnimal in python with dlt
- Load data from Klaviyo to YugabyteDB in python with dlt
- Load data from IBM Db2 to PostgreSQL in python with dlt