Iceberg table format
dlt supports writing Iceberg tables when using the filesystem destination.
How it worksβ
dlt uses the PyIceberg library to write Iceberg tables. One or multiple Parquet files are prepared during the extract and normalize steps. In the load step, these Parquet files are exposed as an Arrow data structure and fed into pyiceberg.
Iceberg single-user ephemeral catalogβ
dlt uses single-table, ephemeral, in-memory, SQLite-based Iceberg catalogs. These catalogs are created "on demand" when a pipeline is run, and do not persist afterwards. If a table already exists in the filesystem, it gets registered into the catalog using its latest metadata file. This allows for a serverless setup. It is currently not possible to connect your own Iceberg catalog.
While ephemeral catalogs make it easy to get started with Iceberg, it comes with limitations:
- concurrent writes are not handled and may lead to corrupt table state
- we cannot guarantee that reads concurrent with writes are clean
- the latest manifest file needs to be searched for using file listingβthis can become slow with large tables, especially in cloud object stores
If you're interested in a multi-user cloud experience and integration with vendor catalogs, such as Polaris or Unity Catalog, check out dltHub Iceberg destination.
Iceberg dependenciesβ
You need Python version 3.9 or higher and the pyiceberg package to use this format:
pip install "dlt[pyiceberg]"
You also need sqlalchemy>=2.0.18:
pip install 'sqlalchemy>=2.0.18'
Additional permissions for Icebergβ
When using Iceberg with object stores like S3, additional permissions may be required for operations like multipart uploads and tagging. Make sure your IAM role or user has the following permissions:
[
"s3:ListBucketMultipartUploads",
"s3:GetBucketLocation",
"s3:AbortMultipartUpload",
"s3:PutObjectTagging",
"s3:GetObjectTagging"
]
Set table formatβ
Set the table_format argument to iceberg when defining your resource:
@dlt.resource(table_format="iceberg")
def my_iceberg_resource():
...
or when calling run on your pipeline:
pipeline.run(my_resource, table_format="iceberg")
dlt always uses Parquet as loader_file_format when using the iceberg table format. Any setting of loader_file_format is disregarded.
Partitioningβ
Apache Iceberg supports table partitioning to optimize query performance. There are two ways to configure partitioning:
- Using the
iceberg_adapterfunction - for advanced partitioning with transformations (year, month, day, hour, bucket, truncate) - Using column-level
partitionproperty - for simple identity partitioning
Iceberg uses hidden partioning.
Partition evolution (changing partition columns after a table has been created) is not supported.
Using the iceberg_adapterβ
The iceberg_adapter function allows you to configure partitioning with various transformation functions.
Basic exampleβ
from datetime import date
import dlt
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
data_items = [
{"id": 1, "category": "A", "created_at": date(2025, 1, 1)},
{"id": 2, "category": "A", "created_at": date(2025, 1, 15)},
{"id": 3, "category": "B", "created_at": date(2025, 2, 1)},
]
@dlt.resource(table_format="iceberg")
def events():
yield data_items
# Partition by category and month of created_at
iceberg_adapter(
events,
partition=[
"category", # identity partition (shorthand)
iceberg_partition.month("created_at"),
],
)
pipeline = dlt.pipeline("iceberg_example", destination="filesystem")
pipeline.run(events)
To use advanced partitioning, import both the adapter and the iceberg_partition helper:
from dlt.destinations.adapters import iceberg_adapter, iceberg_partition
Partition transformationsβ
Iceberg supports several transformation functions for partitioning. Use the iceberg_partition helper to create partition specifications:
iceberg_partition.identity(column_name): Partition by exact column values (this is the same as passing the column name as a string to theiceberg_adapter)iceberg_partition.year(column_name): Partition by year from a date columniceberg_partition.month(column_name): Partition by month from a date columniceberg_partition.day(column_name): Partition by day from a date columniceberg_partition.hour(column_name): Partition by hour from a timestamp columniceberg_partition.bucket(n, column_name): Partition by hashed value intonbucketsiceberg_partition.truncate(length, column_name): Partition by truncated string value tolength
Bucket partitioningβ
Distribute data across a fixed number of buckets using a hash function:
iceberg_adapter(
resource,
partition=[iceberg_partition.bucket(16, "user_id")],
)
Truncate partitioningβ
Partition string values by a fixed prefix length:
iceberg_adapter(
resource,
partition=[iceberg_partition.truncate(3, "category")], # "ELECTRONICS" β "ELE"
)
Custom partition field namesβ
Specify custom names for partition fields:
iceberg_adapter(
resource,
partition=[
iceberg_partition.year("activity_time", "activity_year"),
iceberg_partition.bucket(8, "user_id", "user_bucket"),
],
)
Using column-level partition propertyβ
For simple identity partitioning, you can use the partition column hint directly in the resource definition:
@dlt.resource(
table_format="iceberg",
columns={"region": {"partition": True}}
)
def my_iceberg_resource():
yield [
{"id": 1, "region": "US", "amount": 100},
{"id": 2, "region": "EU", "amount": 200},
]
Multiple columns can be partitioned:
@dlt.resource(
table_format="iceberg",
columns={
"region": {"partition": True},
"category": {"partition": True},
}
)
def multi_partition_data():
...
Table access helper functionsβ
You can use the get_iceberg_tables helper function to access native table objects. These are pyiceberg Table objects.
from dlt.common.libs.pyiceberg import get_iceberg_tables
# get dictionary of Table objects
iceberg_tables = get_iceberg_tables(pipeline)
# execute operations on Table objects
# etc.
Google Cloud Storage authenticationβ
Note that not all authentication methods are supported when using Iceberg on Google Cloud Storage:
- OAuth - β Supported
- Service Account - β Not supported
- Application Default Credentials - β Not supported
The S3-compatible interface for Google Cloud Storage is not supported when using iceberg.
Iceberg Azure schemeβ
The az scheme is not supported when using the iceberg table format. Please use the abfss scheme. This is because pyiceberg, which dlt used under the hood, currently does not support az.
Table format merge supportβ
The upsert merge strategy is supported for iceberg. This strategy requires that the input data contains no duplicate rows based on the key columns, and that the target table also does not contain duplicates on those keys.
Until pyiceberg > 0.9.1 is released, upsert is executed in chunks of 1000 rows.
Schema evolution (changing the set of columns) is not supported when using the upsert merge strategy with pyiceberg == 0.10.0
@dlt.resource(
write_disposition={"disposition": "merge", "strategy": "upsert"},
primary_key="my_primary_key",
table_format="iceberg"
)
def my_upsert_resource():
...