common.libs.pyarrow
UnsupportedArrowTypeException Objects
class UnsupportedArrowTypeException(DltException)
Exception raised when Arrow type conversion failed.
The setters are used to update the exception with more context such as the relevant field and tablea it is caught downstream.
get_column_type_from_py_arrow
def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType
Returns (data_type, precision, scale) tuple from pyarrow.DataType
remove_null_columns
def remove_null_columns(item: TAnyArrowItem) -> TAnyArrowItem
Remove all columns of datatype pyarrow.null() from the table or record batch
remove_columns
def remove_columns(item: TAnyArrowItem,
columns: Sequence[str]) -> TAnyArrowItem
Remove columns
from Arrow item
append_column
def append_column(item: TAnyArrowItem, name: str, data: Any) -> TAnyArrowItem
Appends new column to Table or RecordBatch
rename_columns
def rename_columns(item: TAnyArrowItem,
new_column_names: Sequence[str]) -> TAnyArrowItem
Rename arrow columns on Table or RecordBatch, returns same data but with renamed schema
should_normalize_arrow_schema
def should_normalize_arrow_schema(
schema: pyarrow.Schema,
columns: TTableSchemaColumns,
naming: NamingConvention,
add_load_id: bool = False
) -> Tuple[bool, Mapping[str, str], Dict[str, str], Dict[str, bool], bool,
TTableSchemaColumns]
Figure out if any of the normalization steps must be executed. This prevents
from rewriting arrow tables when no changes are needed. Refer to normalize_py_arrow_item
for a list of normalizations. Note that column
must be already normalized.
normalize_py_arrow_item
def normalize_py_arrow_item(item: TAnyArrowItem,
columns: TTableSchemaColumns,
naming: NamingConvention,
caps: DestinationCapabilitiesContext,
load_id: Optional[str] = None) -> TAnyArrowItem
Normalize arrow item
schema according to the columns
. Note that
columns must be already normalized.
- arrow schema field names will be normalized according to
naming
- arrows columns will be reordered according to
columns
- empty columns will be inserted if they are missing, types will be generated using
caps
- arrow columns with different nullability than corresponding schema columns will be updated
- Add
_dlt_load_id
column if it is missing andload_id
is provided
get_normalized_arrow_fields_mapping
def get_normalized_arrow_fields_mapping(schema: pyarrow.Schema,
naming: NamingConvention) -> StrStr
Normalizes schema field names and returns mapping from original to normalized name. Raises on name collisions
py_arrow_to_table_schema_columns
def py_arrow_to_table_schema_columns(
schema: pyarrow.Schema) -> TTableSchemaColumns
Convert a PyArrow schema to a table schema columns dict.
Arguments:
schema
pyarrow.Schema - pyarrow schema
Returns:
TTableSchemaColumns
- table schema columns
columns_to_arrow
def columns_to_arrow(columns: TTableSchemaColumns,
caps: DestinationCapabilitiesContext,
timestamp_timezone: str = "UTC") -> pyarrow.Schema
Convert a table schema columns dict to a pyarrow schema.
Arguments:
columns
TTableSchemaColumns - table schema columns
Returns:
pyarrow.Schema
- pyarrow schema
get_parquet_metadata
def get_parquet_metadata(
parquet_file: TFileOrPath) -> Tuple[int, pyarrow.Schema]
Gets parquet file metadata (including row count and schema)
Arguments:
parquet_file
str - path to parquet file
Returns:
FileMetaData
- file metadata
to_arrow_scalar
def to_arrow_scalar(value: Any, arrow_type: pyarrow.DataType) -> Any
Converts python value to an arrow compute friendly version
from_arrow_scalar
def from_arrow_scalar(arrow_value: pyarrow.Scalar) -> Any
Converts arrow scalar into Python type.
TNewColumns
Sequence of tuples: (field index, field, generating function)
add_constant_column
def add_constant_column(item: TAnyArrowItem,
name: str,
data_type: pyarrow.DataType,
value: Any = None,
nullable: bool = True,
index: int = -1) -> TAnyArrowItem
Add column with a single value to the table.
Arguments:
item
- Arrow table or record batchname
- The new column namedata_type
- The data type of the new columnnullable
- Whether the new column is nullablevalue
- The value to fill the new column withindex
- The index at which to insert the new column. Defaults to -1 (append)
pq_stream_with_new_columns
def pq_stream_with_new_columns(
parquet_file: TFileOrPath,
columns: TNewColumns,
row_groups_per_read: int = 1) -> Iterator[pyarrow.Table]
Add column(s) to the table in batches.
The table is read from parquet row_groups_per_read
row groups at a time
Arguments:
parquet_file
- path or file object to parquet filecolumns
- list of columns to add in the form of (insertion index,pyarrow.Field
, column_value_callback) The callback should accept apyarrow.Table
and return an array of values for the column.row_groups_per_read
- number of row groups to read at a time. Defaults to 1.
Yields:
pyarrow.Table
objects with the new columns added.
cast_arrow_schema_types
def cast_arrow_schema_types(
schema: pyarrow.Schema, type_map: Dict[Callable[[pyarrow.DataType], bool],
Callable[..., pyarrow.DataType]]
) -> pyarrow.Schema
Returns type-casted Arrow schema.
Replaces data types for fields matching a type check in type_map
.
Type check functions in type_map
are assumed to be mutually exclusive, i.e.
a data type does not match more than one type check function.
concat_batches_and_tables_in_order
def concat_batches_and_tables_in_order(
tables_or_batches: Iterable[Union[pyarrow.Table, pyarrow.RecordBatch]]
) -> pyarrow.Table
Concatenate iterable of tables and batches into a single table, preserving row order. Zero copy is used during concatenation so schemas must be identical.
row_tuples_to_arrow
def row_tuples_to_arrow(rows: Sequence[Any],
caps: DestinationCapabilitiesContext,
columns: TTableSchemaColumns, tz: str) -> Any
Converts the rows to an arrow table using the columns schema.
Columns missing data_type
will be inferred from the row data.
Columns with object types not supported by arrow are excluded from the resulting table.