common.libs.pyarrow
get_column_type_from_py_arrow
def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType
Returns (data_type, precision, scale) tuple from pyarrow.DataType
remove_null_columns
def remove_null_columns(item: TAnyArrowItem) -> TAnyArrowItem
Remove all columns of datatype pyarrow.null() from the table or record batch
remove_columns
def remove_columns(item: TAnyArrowItem,
columns: Sequence[str]) -> TAnyArrowItem
Remove columns
from Arrow item
append_column
def append_column(item: TAnyArrowItem, name: str, data: Any) -> TAnyArrowItem
Appends new column to Table or RecordBatch
rename_columns
def rename_columns(item: TAnyArrowItem,
new_column_names: Sequence[str]) -> TAnyArrowItem
Rename arrow columns on Table or RecordBatch, returns same data but with renamed schema
should_normalize_arrow_schema
def should_normalize_arrow_schema(
schema: pyarrow.Schema,
columns: TTableSchemaColumns,
naming: NamingConvention,
add_load_id: bool = False
) -> Tuple[bool, Mapping[str, str], Dict[str, str], Dict[str, bool], bool,
TTableSchemaColumns]
Figure out if any of the normalization steps must be executed. This prevents
from rewriting arrow tables when no changes are needed. Refer to normalize_py_arrow_item
for a list of normalizations. Note that column
must be already normalized.
normalize_py_arrow_item
def normalize_py_arrow_item(item: TAnyArrowItem,
columns: TTableSchemaColumns,
naming: NamingConvention,
caps: DestinationCapabilitiesContext,
load_id: Optional[str] = None) -> TAnyArrowItem
Normalize arrow item
schema according to the columns
. Note that
columns must be already normalized.
- arrow schema field names will be normalized according to
naming
- arrows columns will be reordered according to
columns
- empty columns will be inserted if they are missing, types will be generated using
caps
- arrow columns with different nullability than corresponding schema columns will be updated
- Add
_dlt_load_id
column if it is missing andload_id
is provided
get_normalized_arrow_fields_mapping
def get_normalized_arrow_fields_mapping(schema: pyarrow.Schema,
naming: NamingConvention) -> StrStr
Normalizes schema field names and returns mapping from original to normalized name. Raises on name collisions
py_arrow_to_table_schema_columns
def py_arrow_to_table_schema_columns(
schema: pyarrow.Schema) -> TTableSchemaColumns
Convert a PyArrow schema to a table schema columns dict.
Arguments:
schema
pyarrow.Schema - pyarrow schema
Returns:
TTableSchemaColumns
- table schema columns
columns_to_arrow
def columns_to_arrow(columns: TTableSchemaColumns,
caps: DestinationCapabilitiesContext,
timestamp_timezone: str = "UTC") -> pyarrow.Schema
Convert a table schema columns dict to a pyarrow schema.
Arguments:
columns
TTableSchemaColumns - table schema columns
Returns:
pyarrow.Schema
- pyarrow schema
get_parquet_metadata
def get_parquet_metadata(
parquet_file: TFileOrPath) -> Tuple[int, pyarrow.Schema]
Gets parquet file metadata (including row count and schema)
Arguments:
parquet_file
str - path to parquet file
Returns:
FileMetaData
- file metadata
to_arrow_scalar
def to_arrow_scalar(value: Any, arrow_type: pyarrow.DataType) -> Any
Converts python value to an arrow compute friendly version
from_arrow_scalar
def from_arrow_scalar(arrow_value: pyarrow.Scalar) -> Any
Converts arrow scalar into Python type.
TNewColumns
Sequence of tuples: (field index, field, generating function)
add_constant_column
def add_constant_column(item: TAnyArrowItem,
name: str,
data_type: pyarrow.DataType,
value: Any = None,
nullable: bool = True,
index: int = -1) -> TAnyArrowItem
Add column with a single value to the table.
Arguments:
item
- Arrow table or record batchname
- The new column namedata_type
- The data type of the new columnnullable
- Whether the new column is nullablevalue
- The value to fill the new column withindex
- The index at which to insert the new column. Defaults to -1 (append)
pq_stream_with_new_columns
def pq_stream_with_new_columns(
parquet_file: TFileOrPath,
columns: TNewColumns,
row_groups_per_read: int = 1) -> Iterator[pyarrow.Table]
Add column(s) to the table in batches.
The table is read from parquet row_groups_per_read
row groups at a time
Arguments:
parquet_file
- path or file object to parquet filecolumns
- list of columns to add in the form of (insertion index,pyarrow.Field
, column_value_callback) The callback should accept apyarrow.Table
and return an array of values for the column.row_groups_per_read
- number of row groups to read at a time. Defaults to 1.
Yields:
pyarrow.Table
objects with the new columns added.
cast_arrow_schema_types
def cast_arrow_schema_types(
schema: pyarrow.Schema, type_map: Dict[Callable[[pyarrow.DataType], bool],
Callable[..., pyarrow.DataType]]
) -> pyarrow.Schema
Returns type-casted Arrow schema.
Replaces data types for fields matching a type check in type_map
.
Type check functions in type_map
are assumed to be mutually exclusive, i.e.
a data type does not match more than one type check function.
concat_batches_and_tables_in_order
def concat_batches_and_tables_in_order(
tables_or_batches: Iterable[Union[pyarrow.Table, pyarrow.RecordBatch]]
) -> pyarrow.Table
Concatenate iterable of tables and batches into a single table, preserving row order. Zero copy is used during concatenation so schemas must be identical.
row_tuples_to_arrow
def row_tuples_to_arrow(rows: Sequence[Any],
caps: DestinationCapabilitiesContext,
columns: TTableSchemaColumns, tz: str) -> Any
Converts the rows to an arrow table using the columns schema.
Columns missing data_type
will be inferred from the row data.
Columns with object types not supported by arrow are excluded from the resulting table.