dlt.destinations.impl.filesystem.iceberg_adapter
iceberg_partition Objects
class iceberg_partition()
Helper class with factory methods for creating partition specs.
identity
@staticmethod
def identity(column_name: str) -> PartitionSpec
Create an identity partition on a column.
Arguments:
column_name- The name of the column to partition on
Returns:
A PartitionSpec for identity partitioning
year
@staticmethod
def year(column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a year partition on a timestamp/date column.
Arguments:
column_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for year partitioning
month
@staticmethod
def month(column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a month partition on a timestamp/date column.
Arguments:
column_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for month partitioning
day
@staticmethod
def day(column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a day partition on a timestamp/date column.
Arguments:
column_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for day partitioning
hour
@staticmethod
def hour(column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create an hour partition on a timestamp column.
Arguments:
column_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for hour partitioning
bucket
@staticmethod
def bucket(num_buckets: int,
column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a bucket partition on a column.
Arguments:
num_buckets- The number of buckets to createcolumn_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for bucket partitioning
truncate
@staticmethod
def truncate(width: int,
column_name: str,
partition_field_name: Optional[str] = None) -> PartitionSpec
Create a truncate partition on a string column.
Arguments:
width- The width to truncate tocolumn_name- The name of the column to partition onpartition_field_name- Optional custom name for the partition field
Returns:
A PartitionSpec for truncate partitioning
iceberg_adapter
def iceberg_adapter(
data: Any,
partition: Union[str, PartitionSpec,
Sequence[Union[str, PartitionSpec]]] = None,
table_properties: Optional[Dict[str, str]] = None) -> DltResource
Prepares data or a DltResource for loading into Apache Iceberg table.
Takes raw data or an existing DltResource and configures it for Iceberg by defining partitioning strategies and/or table properties via the DltResource's hints.
Arguments:
data- The data to be transformed. This can be raw data (e.g., list of dicts) or an instance ofDltResource. If raw data is provided, it will be encapsulated into aDltResourceinstance.partition- Defines how the Iceberg table should be partitioned. It accepts:- A single column name (string): Defaults to an identity transform.
- A
PartitionSpecobject: Allows for detailed partition configuration, including transformation types (year, month, day, hour, bucket, truncate). Use theiceberg_partitionhelper class to create these specs. - A sequence of the above: To define multiple partition columns.
table_properties- A dictionary of Iceberg table properties to set on the table at creation time. Keys and values must be strings. These correspond to Iceberg table properties such aswrite.format.defaultorwrite.target-file-size-bytes. Properties are only applied when the table is first created.
Returns:
A DltResource instance configured with Iceberg-specific hints,
ready for loading.
Raises:
ValueError- If neitherpartitionnortable_propertiesis specified, or if an invalid partition transform is requested within aPartitionSpec, or iftable_propertiesis not a dict with string keys and values.
Examples:
data = [{"id": 1, "event_time": "2023-03-15T10:00:00Z", "category": "A"}]
resource = iceberg_adapter(
... data, ... partition=[ ... "category", # Identity partition on category ... iceberg_partition.year("event_time"), ... ], ... table_properties={"write.format.default": "parquet"}, ... )
parse_partition_hints
def parse_partition_hints(
table_schema: PreparedTableSchema) -> List[PartitionSpec]
Parse PARTITION_HINT from table schema into PartitionSpec list.
Arguments:
table_schema- dlt table schema containing partition hints
Returns:
List of PartitionSpec objects from hints, empty list if no hints found
create_identity_specs
def create_identity_specs(column_names: List[str]) -> List[PartitionSpec]
Create identity partition specs from column names.
Arguments:
column_names- List of column names to partition by identity
Returns:
List of PartitionSpec objects with identity transform