Delta table format
dlt supports writing Delta tables when using the filesystem destination.
How it worksβ
dlt uses the deltalake library to write Delta tables. One or multiple Parquet files are prepared during the extract and normalize steps. In the load step, these Parquet files are exposed as an Arrow data structure and fed into deltalake
.
Delta dependenciesβ
You need the deltalake
package to use this format:
pip install "dlt[deltalake]"
You also need pyarrow>=17.0.0
:
pip install 'pyarrow>=17.0.0'
Set table formatβ
Set the table_format
argument to delta
when defining your resource:
@dlt.resource(table_format="delta")
def my_delta_resource():
...
or when calling run
on your pipeline:
pipeline.run(my_resource, table_format="delta")
dlt always uses Parquet as loader_file_format
when using the delta
table format. Any setting of loader_file_format
is disregarded.
Table format partitioningβ
Delta tables can be partitioned by specifying one or more partition
column hints. This example partitions a Delta table by the foo
column:
@dlt.resource(
table_format="delta",
columns={"foo": {"partition": True}}
)
def my_delta_resource():
...
Delta uses Hive-style partitioning.
Partition evolution (changing partition columns after a table has been created) is not supported.
Table access helper functionsβ
You can use the get_delta_tables
helper function to access native table objects. These are deltalake
DeltaTable objects.
from dlt.common.libs.deltalake import get_delta_tables
# get dictionary of DeltaTable objects
delta_tables = get_delta_tables(pipeline)
# execute operations on DeltaTable objects
delta_tables["my_delta_table"].optimize.compact()
delta_tables["another_delta_table"].optimize.z_order(["col_a", "col_b"])
# delta_tables["my_delta_table"].vacuum()
# etc.
Google Cloud Storage authenticationβ
Note that not all authentication methods are supported when using Delta table format on Google Cloud Storage:
- Service Account - β Supported
- Application Default Credentials - β Supported
- OAuth - β Not supported
Table format merge
support (experimental)β
The upsert
merge strategy is supported for delta
.
The upsert
merge strategy for the filesystem destination with Delta table format is experimental.
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "upsert"},
primary_key="my_primary_key",
table_format="delta"
)
def my_upsert_resource():
...
Known limitationsβ
hard_delete
hint not supported- Deleting records from nested tables not supported
- This means updates to JSON columns that involve element removals are not propagated. For example, if you first load
{"key": 1, "nested": [1, 2]}
and then load{"key": 1, "nested": [1]}
, then the record for element2
will not be deleted from the nested table.
- This means updates to JSON columns that involve element removals are not propagated. For example, if you first load
Delta table format storage optionsβ
You can pass storage options by configuring destination.filesystem.deltalake_storage_options
:
[destination.filesystem]
deltalake_storage_options = '{"AWS_S3_LOCKING_PROVIDER": "dynamodb", "DELTA_DYNAMO_TABLE_NAME": "custom_table_name"}'
dlt passes these options to the storage_options
argument of the write_deltalake
method in the deltalake
library. Look at their documentation to see which options can be used.
You don't need to specify credentials here. dlt merges the required credentials with the options you provided before passing it as storage_options
.
βWhen using
s3
, you need to specify storage options to configure locking behavior.
Delta table format memory usageβ
Beware that when loading a large amount of data for one table, the underlying rust implementation will consume a lot of memory. This is a known issue and the maintainers are actively working on a solution. You can track the progress here. Until the issue is resolved, you can mitigate the memory consumption by doing multiple smaller incremental pipeline runs.