Skip to main content
Version: devel

SQLAlchemy destination

The SQLAlchemy destination allows you to use any database that has an SQLAlchemy dialect implemented as a destination.

Currently, MySQL and SQLite are considered to have full support and are tested as part of the dlt CI suite. Other dialects are not tested but should generally work.

Install dlt with SQLAlchemyโ€‹

Install dlt with the sqlalchemy extra dependency:

pip install "dlt[sqlalchemy]"

Note that database drivers are not included and need to be installed separately for the database you plan on using. For example, for MySQL:

pip install mysqlclient

Refer to the SQLAlchemy documentation on dialects for information about client libraries required for supported databases.

Destination capabilitiesโ€‹

The following table shows the capabilities of the Sqlalchemy destination:

FeatureValueMore
Preferred loader file formattyped-jsonlFile formats
Supported loader file formatstyped-jsonl, parquet, modelFile formats
Has case sensitive identifiersTrueNaming convention
Supported merge strategiesdelete-insert, scd2Merge strategy
Supported replace strategiestruncate-and-insert, insert-from-stagingReplace strategy
Supports tz aware datetimeTrueTimestamps and Timezones
Supports naive datetimeTrueTimestamps and Timezones

This table shows the supported features of the Sqlalchemy destination in dlt.

Create a pipelineโ€‹

1. Initialize a project with a pipeline that loads to MS SQL by running:

dlt init chess sqlalchemy

2. Install the necessary dependencies for SQLAlchemy by running:

pip install -r requirements.txt

or run:

pip install "dlt[sqlalchemy]"

3. Install your database client library.

E.g., for MySQL:

pip install mysqlclient

4. Enter your credentials into .dlt/secrets.toml.

For example, replace with your database connection info:

[destination.sqlalchemy.credentials]
database = "dlt_data"
username = "loader"
password = "<password>"
host = "localhost"
port = 3306
driver_name = "mysql"

Alternatively, a valid SQLAlchemy database URL can be used, either in secrets.toml or as an environment variable. E.g.

[destination.sqlalchemy]
credentials = "mysql://loader:<password>@localhost:3306/dlt_data"

or

export DESTINATION__SQLALCHEMY__CREDENTIALS="mysql://loader:<password>@localhost:3306/dlt_data"

An SQLAlchemy Engine can also be passed directly by creating an instance of the destination:

import sqlalchemy as sa
import dlt

engine = sa.create_engine('sqlite:///chess_data.db')

pipeline = dlt.pipeline(
pipeline_name='chess',
destination=dlt.destinations.sqlalchemy(engine),
dataset_name='main'
)

Passing SQLAlchemy Engine Options: engine_kwargsโ€‹

The SQLAlchemy destination accepts an optional engine_kwargs parameter, which is forwarded directly to sqlalchemy.create_engine. The equivalent engine_args parameter is maintained for backward compatibility, but will be removed in a future release.

Example enabling SQLAlchemy verbose logging:

In .dlt/secrets.tomlโ€‹

[destination.sqlalchemy]
credentials = "sqlite:///logger.db"

In .dlt/config.tomlโ€‹

[destination.sqlalchemy.engine_kwargs]
echo = true

Or, directly in code:

import logging
import dlt
from dlt.destinations import sqlalchemy

logging.basicConfig(level=logging.INFO)

dest = sqlalchemy(
credentials="sqlite:///logger.db",
engine_kwargs={"echo": True},
)

pipeline = dlt.pipeline(
pipeline_name='logger',
destination=dest,
dataset_name='main'
)

pipeline.run(
[
{'id': 1},
{'id': 2},
{'id': 3},
],
table_name="logger"
)

Here, engine_kwargs configures only the engine used by SQLAlchemy as a destination. It does not affect resource extraction (use engine_kwargs for sql sources, see here).

Notes on SQLiteโ€‹

Dataset filesโ€‹

When using an SQLite database file, each dataset is stored in a separate file since SQLite does not support multiple schemas in a single database file. Under the hood, this uses ATTACH DATABASE.

The file is stored in the same directory as the main database file (provided by your database URL).

E.g., if your SQLite URL is sqlite:////home/me/data/chess_data.db and your dataset_name is games, the data is stored in /home/me/data/chess_data__games.db

Note: If the dataset name is main, no additional file is created as this is the default SQLite database.

In-memory databasesโ€‹

In-memory databases require a persistent connection as the database is destroyed when the connection is closed. Normally, connections are opened and closed for each load job and in other stages during the pipeline run. To ensure the database persists throughout the pipeline run, you need to pass in an SQLAlchemy Engine object instead of credentials. This engine is not disposed of automatically by dlt.

The recommended approach uses SQLite's shared-cache URI format. This creates a named in-memory database that can be safely accessed by multiple connections across threads via SingletonThreadPool (one connection per thread):

import dlt
import sqlalchemy as sa

engine = sa.create_engine(
"sqlite:///file:shared?mode=memory&cache=shared&uri=true",
connect_args={"check_same_thread": False},
poolclass=sa.pool.SingletonThreadPool,
)

pipeline = dlt.pipeline(
"my_pipeline",
destination=dlt.destinations.sqlalchemy(engine),
dataset_name="main",
)

pipeline.run([1, 2, 3], table_name="my_table")

with engine.connect() as conn:
result = conn.execute(sa.text("SELECT * FROM my_table"))
print(result.fetchall())

engine.dispose()
  • mode=memory&cache=shared creates a named in-memory database shared across all connections in the process.
  • uri=true is required for pysqlite to interpret the database path as a URI.
  • SingletonThreadPool gives each thread its own connection while all threads see the same data.

StaticPool with single workerโ€‹

Alternatively, you can use StaticPool (single shared connection) with workers=1 to avoid concurrent access on the same connection:

import dlt
import sqlalchemy as sa

engine = sa.create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=sa.pool.StaticPool,
)

pipeline = dlt.pipeline(
"my_pipeline",
destination=dlt.destinations.sqlalchemy(engine),
dataset_name="main",
)

pipeline.run([1, 2, 3], table_name="my_table", loader_file_format="typed-jsonl")

with engine.connect() as conn:
result = conn.execute(sa.text("SELECT * FROM my_table"))
print(result.fetchall())

engine.dispose()
[load]
workers=1
caution

With StaticPool, all threads share a single underlying database connection. Using the default parallel loader (workers > 1) can cause race conditions where committed data appears missing. Always set workers=1 when using StaticPool.

Database locking with ATTACH DATABASE on Windowsโ€‹

When dataset_name is not main, dlt uses SQLite's ATTACH DATABASE to store each dataset in a separate file. On Windows, a second ATTACH on the same connection can lock indefinitely under concurrent access (e.g. when using the default parallel loading strategy).

To work around this issue, use one of the following approaches:

  1. Set dataset_name to main so that no ATTACH is needed:

    pipeline = dlt.pipeline(
    pipeline_name='my_pipeline',
    destination=dlt.destinations.sqlalchemy(credentials="sqlite:///my_data.db"),
    dataset_name='main'
    )
  2. Use sequential loading to avoid concurrent ATTACH calls:

    [load]
    workers=1

Notes on other dialectsโ€‹

We tested this destination on mysql, sqlite, oracledb and mssql dialects. Below are a few notes that may help enabling other dialects:

  1. dlt must be able to recognize if a database exception relates to non existing entity (like table or schema). We put some work to recognize those for most of the popular dialects (look for db_api_client.py)
  2. Primary keys and unique constraints are not created by default to avoid problems with particular dialects.
  3. merge write disposition uses only DELETE and INSERT operations to enable as many dialects as possible.

Please report issues with particular dialects. We'll try to make them work.

Trino limitationsโ€‹

  • Trino dialect does not case fold identifiers. Use snake_case naming convention only.
  • Trino does not support merge/scd2 write disposition (or you somehow create PRIMARY KEYs on engine tables)
  • We convert JSON and BINARY types are cast to STRING (dialect seems to have a conversion bug)
  • Trino does not support PRIMARY/UNIQUE constraints

Oracle limitationsโ€‹

  • In Oracle, regular (non-DBA, non-SYS/SYSOPS) users are assigned one schema on user creation, and usually cannot create other schemas. For features requiring staging datasets you should either ensure schema creation rights for the DB user or exactly specify existing schema to be used for staging dataset. See staging dataset documentation for more details

Adapting destination for a dialectโ€‹

You can adapt destination capabilities for a particular dialect by passing your custom settings. In the example below we pass custom TypeMapper that converts json data into text on the fly.

from dlt.common import json

import dlt
import sqlalchemy as sa
from dlt.destinations.impl.sqlalchemy.type_mapper import SqlalchemyTypeMapper

class JSONString(sa.TypeDecorator):
"""
A custom SQLAlchemy type that stores JSON data as a string in the database.
Automatically serializes Python objects to JSON strings on write and
deserializes JSON strings back to Python objects on read.
"""

impl = sa.String
cache_ok = True

def process_bind_param(self, value, dialect):
if value is None:
return None

return json.dumps(value)

def process_result_value(self, value, dialect):
if value is None:
return None

return json.loads(value)

class TrinoTypeMapper(SqlalchemyTypeMapper):
"""Example mapper that plugs custom string type that serialized to from/json

Note that instance of TypeMapper contains dialect and destination capabilities instance
for a deeper integration
"""

def to_destination_type(self, column, table=None):
if column["data_type"] == "json":
return JSONString()
return super().to_destination_type(column, table)

# pass dest_ in `destination` argument to dlt.pipeline
dest_ = dlt.destinations.sqlalchemy(type_mapper=TrinoTypeMapper)

Custom type mapper is also useful when ie. you want to limit the length of the string. Below we are adding variant for mssql dialect:

import sqlalchemy as sa
from dlt.destinations.impl.sqlalchemy.type_mapper import SqlalchemyTypeMapper

class CustomMssqlTypeMapper(SqlalchemyTypeMapper):
"""This is only an illustration, `sqlalchemy` destination already handles mssql types"""

def to_destination_type(self, column, table=None):
type_ = super().to_destination_type(column, table)
if column["data_type"] == "text":
length = precision = column.get("precision")
if length is None:
return type_.with_variant(sa.UnicodeText(), "mssql") # type: ignore[no-any-return]
else:
return type_.with_variant(sa.Unicode(length=length), "mssql") # type: ignore[no-any-return]
return type_
warning

When extending type mapper for mssql, mysql and trino start with MssqlVariantTypeMapper, MysqlVariantTypeMapper and TrinoVariantTypeMapper respectively

Write dispositionsโ€‹

The following write dispositions are supported:

  • append
  • replace with truncate-and-insert and insert-from-staging replace strategies. staging-optimized falls back to insert-from-staging.
  • merge with delete-insert and scd2 merge strategies.

Data loadingโ€‹

Fast loading with parquetโ€‹

parquet file format is supported via ADBC driver for mysql. The driver is provided by Columnar. To install it you'll need dbc which is a tool to manage ADBC drivers:

pip install adbc-driver-manager dbc
dbc install mysql

with uv you can run dbc directly:

uv tool run dbc install mysql

You must have the correct driver installed and loader_file_format set to parquet in order to use ADBC. If driver is not found, dlt will convert parquet into INSERT statements.

We copy parquet files with batches of size of 1 row group. All groups are copied in a single transaction.

caution

The ADBC driver is based on go-mysql. We do minimal conversion of connection strings from SQLAlchemy (ssl cert settings for mysql).

Why ADBC is not supported for SQLiteโ€‹

ADBC is disabled for SQLite because Python's sqlite3 module and adbc_driver_sqlite bundle different SQLite library versions. When both libraries operate on the same database file in WAL mode, they have conflicting memory-mapped views of the WAL index file (-shm), causing data corruption. See TensorBoard issue #1467 for details on this two-library conflict.

For SQLite, parquet files are loaded using batch INSERT statements instead.

Loading with SqlAlchemy batch INSERTsโ€‹

Data is loaded in a dialect-agnostic manner with an insert statement generated by SQLAlchemy's core API. Rows are inserted in batches as long as the underlying database driver supports it. By default, the batch size is 10,000 rows.

Syncing of dlt stateโ€‹

This destination fully supports dlt state sync.

Data typesโ€‹

All dlt data types are supported, but how they are stored in the database depends on the SQLAlchemy dialect. For example, SQLite does not have DATETIME or TIMESTAMP types, so timestamp columns are stored as TEXT in ISO 8601 format.

Supported file formatsโ€‹

  • typed-jsonl is used by default. JSON-encoded data with typing information included.
  • Parquet is supported.

Supported column hintsโ€‹

No indexes or constraints are created on the table. You can enable the following via destination configuration

[destination.sqlalchemy]
create_unique_indexes=true
create_primary_keys=true
  • unique hints are translated to UNIQUE constraints via SQLAlchemy.
  • primary_key hints are translated to PRIMARY KEY constraints via SQLAlchemy.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub โ€“ it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.