Destination: Delta
This page is for dlt+, which requires a license. Join our early access program for a trial license.
Delta
The Delta destination is based on the filesystem destination in dlt. All configuration options from the filesystem destination can be configured as well.
Under the hood, dlt+ uses the deltalake library to write Delta tables. Beware that when loading a large amount of data for one table, the underlying Rust implementation will consume a lot of memory. This is a known issue, and the maintainers are actively working on a solution. You can track the progress here. Until the issue is resolved, you can mitigate the memory consumption by doing multiple smaller incremental pipeline runs.
Setup
Make sure you have installed the necessary dependencies:
pip install deltalake
pip install pyarrow>=2.0.18
Initialize a dlt+ project in the current working directory with the following command:
# replace sql_database with the source of your choice
dlt project init sql_database delta
This will create a Delta destination in your dlt.yml
, where you can configure the destination:
destinations:
delta_destination:
type: delta
bucket_url: "s3://your_bucket" # replace with bucket url
The credentials can be defined in the secrets.toml
:
- AWS S3
- GCS/GDrive
- Azure
- SFTP
# secrets.toml
[destination.delta.credentials]
aws_access_key_id="Please set me up!"
aws_secret_access_key="Please set me up!"
# secrets.toml
[destination.delta.credentials]
azure_storage_account_name="Please set me up!"
azure_storage_account_key="Please set me up!"
Only Service Account and Application Default Credentials authentication methods are supported for Google Cloud Storage.
# secrets.toml
[destination.delta.credentials]
client_email="Please set me up!"
private_key="Please set me up!"
project_id="Please set me up!"
Learn how to set up SFTP credentials for each authentication method in the SFTP section. For example, in the case of key-based authentication, you can configure the source the following way:
# secrets.toml
[destination.delta.credentials]
sftp_username = "foo"
sftp_key_filename = "/path/to/id_rsa" # Replace with the path to your private key file
sftp_key_passphrase = "your_passphrase" # Optional: passphrase for your private key
The Delta destination can also be defined in Python as follows:
pipeline = dlt.pipeline("loads_delta", destination="delta")
Write dispositions
The Delta destination handles the write dispositions as follows:
append
- files belonging to such tables are added to the dataset folder.replace
- all files that belong to such tables are deleted from the dataset folder, and then the current set of files is added.merge
- can be used only with theupsert
merge strategy.
The upsert
merge strategy for the Delta destination is experimental.
The merge
write disposition can be configured as follows on the source/resource level:
- dlt.yml
- Python
sources:
my_source:
type: sources.my_source
with_args:
write_disposition:
disposition: merge
strategy: upsert
@dlt.resource(
primary_key="id", # merge_key also works; primary_key and merge_key may be used together
write_disposition={"disposition": "merge", "strategy": "upsert"},
)
def my_resource():
yield [
{"id": 1, "foo": "foo"},
{"id": 2, "foo": "bar"}
]
...
pipeline = dlt.pipeline("loads_delta", destination="delta")
Or on the pipeline.run
level:
pipeline.run(write_disposition={"disposition": "merge", "strategy": "upsert"})
Partitioning
Delta tables can be partitioned (using Hive-style partitioning) by specifying one or more partition column hints on the source/resource level:
- dlt.yml
- Python
sources:
my_source:
type: sources.my_source
with_args:
columns:
foo:
partition: True
@dlt.resource(
columns={"_dlt_load_id": {"partition": True}}
)
def my_resource():
...
pipeline = dlt.pipeline("loads_delta", destination="delta")
Partition evolution (changing partition columns after a table has been created) is currently not supported.
Table access helper functions
You can use the get_delta_tables
helper functions to access the native DeltaTable objects.
from dlt.common.libs.deltalake import get_delta_tables
...
# get dictionary of DeltaTable objects
delta_tables = get_delta_tables(pipeline)
# execute operations on DeltaTable objects
delta_tables["my_delta_table"].optimize.compact()
delta_tables["another_delta_table"].optimize.z_order(["col_a", "col_b"])
# delta_tables["my_delta_table"].vacuum()
# etc.
Table format
The Delta destination automatically assigns the delta
table format to all resources that it will load. You can still fall back to storing files by setting table_format
to native on the resource level:
@dlt.resource(
table_format="native"
)
def my_resource():
...
pipeline = dlt.pipeline("loads_delta", destination="delta")
Storage options
You can pass storage options by configuring destination.delta.deltalake_storage_options
:
[destination.delta]
deltalake_storage_options = '{"AWS_S3_LOCKING_PROVIDER": "dynamodb", "DELTA_DYNAMO_TABLE_NAME": "custom_table_name"}'
dlt
passes these options to the storage_options
argument of the write_deltalake
method in the deltalake
library. Look at their documentation to see which options can be used.
You don't need to specify credentials here. dlt merges the required credentials with the options you provided before passing them as storage_options
.
When using s3
, you need to specify storage options to configure locking behavior.