common.libs.deltalake
ensure_delta_compatible_arrow_schema
def ensure_delta_compatible_arrow_schema(
schema: pa.Schema,
partition_by: Optional[Union[List[str], str]] = None) -> pa.Schema
Returns Arrow schema compatible with Delta table format.
Casts schema to replace data types not supported by Delta.
ensure_delta_compatible_arrow_data
def ensure_delta_compatible_arrow_data(
data: Union[pa.Table, pa.RecordBatchReader],
partition_by: Optional[Union[List[str], str]] = None
) -> Union[pa.Table, pa.RecordBatchReader]
Returns Arrow data compatible with Delta table format.
Casts data
schema to replace data types not supported by Delta.
get_delta_write_mode
def get_delta_write_mode(write_disposition: TWriteDisposition) -> str
Translates dlt write disposition to Delta write mode.
write_delta_table
def write_delta_table(
table_or_uri: Union[str, Path, DeltaTable],
data: Union[pa.Table, pa.RecordBatchReader],
write_disposition: TWriteDisposition,
partition_by: Optional[Union[List[str], str]] = None,
storage_options: Optional[Dict[str, str]] = None) -> None
Writes in-memory Arrow data to on-disk Delta table.
Thin wrapper around deltalake.write_deltalake
.
merge_delta_table
def merge_delta_table(table: DeltaTable, data: Union[pa.Table,
pa.RecordBatchReader],
schema: TTableSchema) -> None
Merges in-memory Arrow data into on-disk Delta table.
get_delta_tables
def get_delta_tables(pipeline: Pipeline,
*tables: str,
schema_name: str = None) -> Dict[str, DeltaTable]
Returns Delta tables in pipeline.default_schema (default)
as deltalake.DeltaTable
objects.
Returned object is a dictionary with table names as keys and DeltaTable
objects as values.
Optionally filters dictionary by table names specified as *tables*
.
Raises ValueError if table name specified as *tables
is not found. You may try to switch to other
schemas via schema_name
argument.