dlt.common.libs.pyiceberg
merge_iceberg_table
def merge_iceberg_table(table: IcebergTable, data: pa.Table,
schema: TTableSchema, load_table_name: str) -> None
Merges in-memory Arrow data into on-disk Iceberg table.
CatalogNotFoundError Objects
class CatalogNotFoundError(Exception)
Raised when a catalog cannot be found in the specified configuration method
PyicebergCatalogConfig Objects
class PyicebergCatalogConfig(BaseModel)
type
noqa
IcebergConfig Objects
@configspec
class IcebergConfig(BaseConfiguration)
iceberg_catalog_name
Name of the Iceberg catalog to use. Corresponds to catalog name in .pyiceberg.yaml
iceberg_catalog_type
Type of Iceberg catalog: 'sql', 'rest', 'glue', 'hive', etc.
iceberg_catalog_config
Optional dictionary with complete catalog configuration. If provided, will be used instead of loading from .pyiceberg.yaml. Example for REST catalog: { 'type': 'rest', 'uri': 'https://catalog.example.com', 'warehouse': 'my_warehouse', 'credential': 'token', 'scope': 'PRINCIPAL_ROLE:ALL' } Example for SQL catalog: { 'type': 'sql', 'uri': 'postgresql://user:pass@localhost/catalog' }
Example for secrets.toml: [iceberg_catalog] iceberg_catalog_name = "default" iceberg_catalog_type = "rest"
[iceberg_catalog.iceberg_catalog_config] uri = "http://localhost:8181/catalog" warehouse = "default" header.X-Iceberg-Access-Delegation = "remote-signing" py-io-impl = "pyiceberg.io.fsspec.FsspecFileIO" s3.endpoint = "https://cool-bucket.com/" s3.access-key-id = "cool-bucket-access-key" s3.secret-access-key = "cool-bucket-secret-key" s3.region = "cool-bucket-region"
get_catalog
@with_config(spec=IcebergConfig, sections="iceberg_catalog")
def get_catalog(
iceberg_catalog_name: str = "default",
iceberg_catalog_type: Optional[str] = None,
iceberg_catalog_config: Optional[Dict[str, Any]] = None,
credentials: Optional[FileSystemCredentials] = None) -> IcebergCatalog
Get an Iceberg catalog using multiple configuration methods.
This function tries to load a catalog in the following priority order:
- From explicit config dictionary (if iceberg_catalog_config provided)
- From .pyiceberg.yaml file or from environment variables (PYICEBERG_*). Resolved by pyiceberg load_catalog mechanism. See https://py.iceberg.apache.org/configuration/#setting-configuration-values
- Fall back to in-memory SQLite catalog
Arguments:
iceberg_catalog_name- Name of the catalog (default: "default")iceberg_catalog_type- Type of catalog ('sql' or 'rest')iceberg_catalog_config- Optional dictionary with complete catalog configurationcredentials- Optional filesystem credentials. This is ONLY used for backward compatibility with in-memory SQLite catalog.
Returns:
IcebergCatalog instance
Examples:
Load from config dict
config = {'type': 'rest', 'uri': 'https://...', 'warehouse': 'wh'} catalog = get_catalog('my_catalog', iceberg_catalog_type='rest', iceberg_catalog_config=config)
Load from .pyiceberg.yaml
catalog = get_catalog('my_catalog', iceberg_catalog_type='sql')
Load from environment variables
(set PYICEBERG_CATALOG_TYPE, PYICEBERG_CATALOG_URI, etc.)
catalog = get_catalog('my_catalog', iceberg_catalog_type='rest')
get_iceberg_tables
def get_iceberg_tables(
pipeline: SupportsPipeline,
*tables: str,
schema_name: Optional[str] = None,
include_dlt_tables: bool = False) -> Dict[str, IcebergTable]
Returns Iceberg tables in pipeline.default_schema (default) or schema_name as pyiceberg.Table objects.
Returned object is a dictionary with table names as keys and Tables objects as values.
Optionally filters dictionary by table names specified as *tables*.
Raises ValueError if table name specified as *tables is not found. You may try to switch to other
schemas via schema_name argument.