SQLAlchemy destination
The SQLAlchemy destination allows you to use any database that has an SQLAlchemy dialect implemented as a destination.
Currently, MySQL and SQLite are considered to have full support and are tested as part of the dlt CI suite. Other dialects are not tested but should generally work.
Install dlt with SQLAlchemyโ
Install dlt with the sqlalchemy extra dependency:
pip install "dlt[sqlalchemy]"
Note that database drivers are not included and need to be installed separately for the database you plan on using. For example, for MySQL:
pip install mysqlclient
Refer to the SQLAlchemy documentation on dialects for information about client libraries required for supported databases.
Destination capabilitiesโ
The following table shows the capabilities of the Sqlalchemy destination:
| Feature | Value | More |
|---|---|---|
| Preferred loader file format | typed-jsonl | File formats |
| Supported loader file formats | typed-jsonl, parquet, model | File formats |
| Has case sensitive identifiers | True | Naming convention |
| Supported merge strategies | delete-insert, scd2 | Merge strategy |
| Supported replace strategies | truncate-and-insert, insert-from-staging | Replace strategy |
| Supports tz aware datetime | True | Timestamps and Timezones |
| Supports naive datetime | True | Timestamps and Timezones |
This table shows the supported features of the Sqlalchemy destination in dlt.
Create a pipelineโ
1. Initialize a project with a pipeline that loads to MS SQL by running:
dlt init chess sqlalchemy
2. Install the necessary dependencies for SQLAlchemy by running:
pip install -r requirements.txt
or run:
pip install "dlt[sqlalchemy]"
3. Install your database client library.
E.g., for MySQL:
pip install mysqlclient
4. Enter your credentials into .dlt/secrets.toml.
For example, replace with your database connection info:
[destination.sqlalchemy.credentials]
database = "dlt_data"
username = "loader"
password = "<password>"
host = "localhost"
port = 3306
driver_name = "mysql"
Alternatively, a valid SQLAlchemy database URL can be used, either in secrets.toml or as an environment variable.
E.g.
[destination.sqlalchemy]
credentials = "mysql://loader:<password>@localhost:3306/dlt_data"
or
export DESTINATION__SQLALCHEMY__CREDENTIALS="mysql://loader:<password>@localhost:3306/dlt_data"
An SQLAlchemy Engine can also be passed directly by creating an instance of the destination:
import sqlalchemy as sa
import dlt
engine = sa.create_engine('sqlite:///chess_data.db')
pipeline = dlt.pipeline(
pipeline_name='chess',
destination=dlt.destinations.sqlalchemy(engine),
dataset_name='main'
)
Passing SQLAlchemy Engine Options: engine_kwargsโ
The SQLAlchemy destination accepts an optional engine_kwargs parameter, which is forwarded directly to sqlalchemy.create_engine.
The equivalent engine_args parameter is maintained for backward compatibility, but will be removed in a future release.
Example enabling SQLAlchemy verbose logging:
In .dlt/secrets.tomlโ
[destination.sqlalchemy]
credentials = "sqlite:///logger.db"
In .dlt/config.tomlโ
[destination.sqlalchemy.engine_kwargs]
echo = true
Or, directly in code:
import logging
import dlt
from dlt.destinations import sqlalchemy
logging.basicConfig(level=logging.INFO)
dest = sqlalchemy(
credentials="sqlite:///logger.db",
engine_kwargs={"echo": True},
)
pipeline = dlt.pipeline(
pipeline_name='logger',
destination=dest,
dataset_name='main'
)
pipeline.run(
[
{'id': 1},
{'id': 2},
{'id': 3},
],
table_name="logger"
)
Here, engine_kwargs configures only the engine used by SQLAlchemy as a destination. It does not affect resource extraction (use engine_kwargs for sql sources, see here).
Notes on SQLiteโ
Dataset filesโ
When using an SQLite database file, each dataset is stored in a separate file since SQLite does not support multiple schemas in a single database file.
Under the hood, this uses ATTACH DATABASE.
The file is stored in the same directory as the main database file (provided by your database URL).
E.g., if your SQLite URL is sqlite:////home/me/data/chess_data.db and your dataset_name is games, the data
is stored in /home/me/data/chess_data__games.db
Note: If the dataset name is main, no additional file is created as this is the default SQLite database.
In-memory databasesโ
In-memory databases require a persistent connection as the database is destroyed when the connection is closed.
Normally, connections are opened and closed for each load job and in other stages during the pipeline run.
To ensure the database persists throughout the pipeline run, you need to pass in an SQLAlchemy Engine object instead of credentials.
This engine is not disposed of automatically by dlt.
Shared-cache URI mode (recommended)โ
The recommended approach uses SQLite's shared-cache URI format.
This creates a named in-memory database that can be safely accessed by multiple connections across threads
via SingletonThreadPool (one connection per thread):
import dlt
import sqlalchemy as sa
engine = sa.create_engine(
"sqlite:///file:shared?mode=memory&cache=shared&uri=true",
connect_args={"check_same_thread": False},
poolclass=sa.pool.SingletonThreadPool,
)
pipeline = dlt.pipeline(
"my_pipeline",
destination=dlt.destinations.sqlalchemy(engine),
dataset_name="main",
)
pipeline.run([1, 2, 3], table_name="my_table")
with engine.connect() as conn:
result = conn.execute(sa.text("SELECT * FROM my_table"))
print(result.fetchall())
engine.dispose()
mode=memory&cache=sharedcreates a named in-memory database shared across all connections in the process.uri=trueis required forpysqliteto interpret the database path as a URI.SingletonThreadPoolgives each thread its own connection while all threads see the same data.
StaticPool with single workerโ
Alternatively, you can use StaticPool (single shared connection) with workers=1 to avoid concurrent
access on the same connection:
import dlt
import sqlalchemy as sa
engine = sa.create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=sa.pool.StaticPool,
)
pipeline = dlt.pipeline(
"my_pipeline",
destination=dlt.destinations.sqlalchemy(engine),
dataset_name="main",
)
pipeline.run([1, 2, 3], table_name="my_table", loader_file_format="typed-jsonl")
with engine.connect() as conn:
result = conn.execute(sa.text("SELECT * FROM my_table"))
print(result.fetchall())
engine.dispose()
[load]
workers=1
With StaticPool, all threads share a single underlying database connection.
Using the default parallel loader (workers > 1) can cause race conditions where
committed data appears missing. Always set workers=1 when using StaticPool.
Database locking with ATTACH DATABASE on Windowsโ
When dataset_name is not main, dlt uses SQLite's ATTACH DATABASE to store each dataset in a separate file. On Windows, a second ATTACH on the same connection can lock indefinitely under concurrent access (e.g. when using the default parallel loading strategy).
To work around this issue, use one of the following approaches:
-
Set
dataset_nametomainso that noATTACHis needed:pipeline = dlt.pipeline(
pipeline_name='my_pipeline',
destination=dlt.destinations.sqlalchemy(credentials="sqlite:///my_data.db"),
dataset_name='main'
) -
Use sequential loading to avoid concurrent
ATTACHcalls:[load]
workers=1
Notes on other dialectsโ
We tested this destination on mysql, sqlite, oracledb and mssql dialects. Below are a few notes that may help enabling other dialects:
dltmust be able to recognize if a database exception relates to non existing entity (like table or schema). We put some work to recognize those for most of the popular dialects (look fordb_api_client.py)- Primary keys and unique constraints are not created by default to avoid problems with particular dialects.
mergewrite disposition uses onlyDELETEandINSERToperations to enable as many dialects as possible.
Please report issues with particular dialects. We'll try to make them work.
Trino limitationsโ
- Trino dialect does not case fold identifiers. Use
snake_casenaming convention only. - Trino does not support merge/scd2 write disposition (or you somehow create PRIMARY KEYs on engine tables)
- We convert JSON and BINARY types are cast to STRING (dialect seems to have a conversion bug)
- Trino does not support PRIMARY/UNIQUE constraints
Oracle limitationsโ
- In Oracle, regular (non-DBA, non-SYS/SYSOPS) users are assigned one schema on user creation, and usually cannot create other schemas. For features requiring staging datasets you should either ensure schema creation rights for the DB user or exactly specify existing schema to be used for staging dataset. See staging dataset documentation for more details
Adapting destination for a dialectโ
Quick approach: pass type_mapper directlyโ
You can adapt destination capabilities for a particular dialect by passing your custom settings. In the example below we pass custom TypeMapper that
converts json data into text on the fly.
from dlt.common import json
import dlt
import sqlalchemy as sa
from dlt.destinations.impl.sqlalchemy.type_mapper import SqlalchemyTypeMapper
class JSONString(sa.TypeDecorator):
"""
A custom SQLAlchemy type that stores JSON data as a string in the database.
Automatically serializes Python objects to JSON strings on write and
deserializes JSON strings back to Python objects on read.
"""
impl = sa.String
cache_ok = True
def process_bind_param(self, value, dialect):
if value is None:
return None
return json.dumps(value)
def process_result_value(self, value, dialect):
if value is None:
return None
return json.loads(value)
class TrinoTypeMapper(SqlalchemyTypeMapper):
"""Example mapper that plugs custom string type that serialized to from/json
Note that instance of TypeMapper contains dialect and destination capabilities instance
for a deeper integration
"""
def _db_type_from_json_type(self, column, table=None):
return JSONString()
# pass dest_ in `destination` argument to dlt.pipeline
dest_ = dlt.destinations.sqlalchemy(type_mapper=TrinoTypeMapper)
The SqlalchemyTypeMapper dispatches to per-type visitor methods (db_type_from_text_type, db_type_from_json_type, db_type_from_bool_type, etc.), so you only need to override the type(s) you want to customize. You can also override to_destination_type() directly for full control.
Custom type mapper is also useful when you want to limit the length of the string. Below we are adding variant
for mssql dialect:
import sqlalchemy as sa
from dlt.destinations.impl.sqlalchemy.type_mapper import SqlalchemyTypeMapper
class CustomMssqlTypeMapper(SqlalchemyTypeMapper):
"""This is only an illustration, `sqlalchemy` destination already handles mssql types"""
def db_type_from_text_type(self, column, table=None):
type_ = super().db_type_from_text_type(column, table)
length = column.get("precision")
if length is None:
return type_.with_variant(sa.UnicodeText(), "mssql") # type: ignore[no-any-return]
else:
return type_.with_variant(sa.Unicode(length=length), "mssql") # type: ignore[no-any-return]
When extending type mapper for mssql, mysql and trino start with MssqlVariantTypeMapper, MysqlVariantTypeMapper and TrinoVariantTypeMapper respectively
Full approach: register custom dialect capabilitiesโ
For a more comprehensive integration, you can register a DialectCapabilities class for your database backend. This allows you to customize type mapping, destination capabilities, table structure, and error handling โ all in one place. Registered capabilities are automatically applied when the SQLAlchemy destination connects to a matching database.
from typing import Optional, Type
import sqlalchemy as sa
from dlt.common.destination.capabilities import DataTypeMapper, DestinationCapabilitiesContext
from dlt.common.destination.typing import PreparedTableSchema
from dlt.destinations.impl.sqlalchemy.dialect import (
DialectCapabilities,
register_dialect_capabilities,
)
from dlt.destinations.impl.sqlalchemy.type_mapper import SqlalchemyTypeMapper
class MyTypeMapper(SqlalchemyTypeMapper):
"""Override only the types you need to customize."""
def _db_type_from_json_type(self, column, table=None):
# store JSON as VARCHAR instead of native JSON
return sa.String(length=4000)
class MyDialectCapabilities(DialectCapabilities):
def adjust_capabilities(
self, caps: DestinationCapabilitiesContext, dialect: sa.engine.interfaces.Dialect
) -> None:
caps.max_identifier_length = 128
caps.max_column_identifier_length = 128
caps.sqlglot_dialect = "oracle" # type: ignore[assignment]
def type_mapper_class(self) -> Optional[Type[DataTypeMapper]]:
return MyTypeMapper
def adapt_table(
self, table: sa.Table, table_schema: PreparedTableSchema
) -> sa.Table:
# Example: reorder columns so primary key columns come first.
# Some databases (e.g. StarRocks) require this ordering.
pk_col_names = [c.name for c in table.primary_key.columns]
if not pk_col_names:
return table
pk_cols = [c for c in table.columns if c.name in pk_col_names]
other_cols = [c for c in table.columns if c.name not in pk_col_names]
if [c.name for c in table.columns] == [c.name for c in pk_cols + other_cols]:
return table # already in order
schema = table.schema
name = table.name
metadata = table.metadata
metadata.remove(table)
return sa.Table(
name, metadata,
*[c.copy() for c in pk_cols + other_cols],
sa.PrimaryKeyConstraint(*pk_col_names),
schema=schema,
)
def is_undefined_relation(self, e: Exception) -> Optional[bool]:
# return True if the exception means table/schema doesn't exist
# return False to prevent default pattern matching
# return None to fall through to built-in patterns
if "MY_CUSTOM_MISSING_TABLE_CODE" in str(e):
return True
return None
# register for your backend name (as shown in the SQLAlchemy connection URL)
register_dialect_capabilities("my_dialect", MyDialectCapabilities)
After registration, any pipeline using a my_dialect:// connection URL will automatically use the custom capabilities. No additional configuration is needed.
The DialectCapabilities class supports four extension points:
| Method | Description |
|---|---|
adjust_capabilities | Modify destination capabilities (identifier lengths, timestamp precision, sqlglot dialect, etc.) |
type_mapper_class | Return a custom DataTypeMapper subclass for the dialect |
adapt_table | Modify sa.Table objects before they are created or used for loading (e.g. reorder columns for StarRocks) |
is_undefined_relation | Classify exceptions as "table/schema not found" errors for the dialect |
Passing type_mapper= directly to dlt.destinations.sqlalchemy() always takes precedence over the registered dialect capabilities. Use direct passing for one-off overrides and registration for reusable dialect support.
Write dispositionsโ
The following write dispositions are supported:
appendreplacewithtruncate-and-insertandinsert-from-stagingreplace strategies.staging-optimizedfalls back toinsert-from-staging.mergewithdelete-insertandscd2merge strategies.
Data loadingโ
Fast loading with parquetโ
parquet file format is supported via ADBC driver for mysql.
The driver is provided by Columnar. To install it you'll need dbc which is a tool to manage ADBC drivers:
pip install adbc-driver-manager dbc
dbc install mysql
with uv you can run dbc directly:
uv tool run dbc install mysql
You must have the correct driver installed and loader_file_format set to parquet in order to use ADBC. If driver is not found,
dlt will convert parquet into INSERT statements.
We copy parquet files with batches of size of 1 row group. All groups are copied in a single transaction.
The ADBC driver is based on go-mysql. We do minimal conversion of connection strings from SQLAlchemy (ssl cert settings for mysql).
Why ADBC is not supported for SQLiteโ
ADBC is disabled for SQLite because Python's sqlite3 module and adbc_driver_sqlite bundle different SQLite library versions.
When both libraries operate on the same database file in WAL mode, they have conflicting memory-mapped views of the
WAL index file (-shm), causing data corruption. See TensorBoard issue #1467
for details on this two-library conflict.
For SQLite, parquet files are loaded using batch INSERT statements instead.
Loading with SqlAlchemy batch INSERTsโ
Data is loaded in a dialect-agnostic manner with an insert statement generated by SQLAlchemy's core API.
Rows are inserted in batches as long as the underlying database driver supports it. By default, the batch size is 10,000 rows.
Syncing of dlt stateโ
This destination fully supports dlt state sync.
Data typesโ
All dlt data types are supported, but how they are stored in the database depends on the SQLAlchemy dialect.
For example, SQLite does not have DATETIME or TIMESTAMP types, so timestamp columns are stored as TEXT in ISO 8601 format.
Supported file formatsโ
- typed-jsonl is used by default. JSON-encoded data with typing information included.
- Parquet is supported.
Supported column hintsโ
No indexes or constraints are created on the table. You can enable the following via destination configuration
[destination.sqlalchemy]
create_unique_indexes=true
create_primary_keys=true
uniquehints are translated toUNIQUEconstraints via SQLAlchemy.primary_keyhints are translated toPRIMARY KEYconstraints via SQLAlchemy.