Skip to main content
Version: devel

common.libs.pyarrow

get_column_type_from_py_arrow

def get_column_type_from_py_arrow(dtype: pyarrow.DataType) -> TColumnType

[view_source]

Returns (data_type, precision, scale) tuple from pyarrow.DataType

remove_null_columns

def remove_null_columns(item: TAnyArrowItem) -> TAnyArrowItem

[view_source]

Remove all columns of datatype pyarrow.null() from the table or record batch

remove_columns

def remove_columns(item: TAnyArrowItem,
columns: Sequence[str]) -> TAnyArrowItem

[view_source]

Remove columns from Arrow item

append_column

def append_column(item: TAnyArrowItem, name: str, data: Any) -> TAnyArrowItem

[view_source]

Appends new column to Table or RecordBatch

rename_columns

def rename_columns(item: TAnyArrowItem,
new_column_names: Sequence[str]) -> TAnyArrowItem

[view_source]

Rename arrow columns on Table or RecordBatch, returns same data but with renamed schema

should_normalize_arrow_schema

def should_normalize_arrow_schema(
schema: pyarrow.Schema,
columns: TTableSchemaColumns,
naming: NamingConvention,
add_load_id: bool = False
) -> Tuple[bool, Mapping[str, str], Dict[str, str], Dict[str, bool], bool,
TTableSchemaColumns]

[view_source]

Figure out if any of the normalization steps must be executed. This prevents from rewriting arrow tables when no changes are needed. Refer to normalize_py_arrow_item for a list of normalizations. Note that column must be already normalized.

normalize_py_arrow_item

def normalize_py_arrow_item(item: TAnyArrowItem,
columns: TTableSchemaColumns,
naming: NamingConvention,
caps: DestinationCapabilitiesContext,
load_id: Optional[str] = None) -> TAnyArrowItem

[view_source]

Normalize arrow item schema according to the columns. Note that columns must be already normalized.

  1. arrow schema field names will be normalized according to naming
  2. arrows columns will be reordered according to columns
  3. empty columns will be inserted if they are missing, types will be generated using caps
  4. arrow columns with different nullability than corresponding schema columns will be updated
  5. Add _dlt_load_id column if it is missing and load_id is provided

get_normalized_arrow_fields_mapping

def get_normalized_arrow_fields_mapping(schema: pyarrow.Schema,
naming: NamingConvention) -> StrStr

[view_source]

Normalizes schema field names and returns mapping from original to normalized name. Raises on name collisions

py_arrow_to_table_schema_columns

def py_arrow_to_table_schema_columns(
schema: pyarrow.Schema) -> TTableSchemaColumns

[view_source]

Convert a PyArrow schema to a table schema columns dict.

Arguments:

  • schema pyarrow.Schema - pyarrow schema

Returns:

  • TTableSchemaColumns - table schema columns

columns_to_arrow

def columns_to_arrow(columns: TTableSchemaColumns,
caps: DestinationCapabilitiesContext,
timestamp_timezone: str = "UTC") -> pyarrow.Schema

[view_source]

Convert a table schema columns dict to a pyarrow schema.

Arguments:

  • columns TTableSchemaColumns - table schema columns

Returns:

  • pyarrow.Schema - pyarrow schema

get_parquet_metadata

def get_parquet_metadata(
parquet_file: TFileOrPath) -> Tuple[int, pyarrow.Schema]

[view_source]

Gets parquet file metadata (including row count and schema)

Arguments:

  • parquet_file str - path to parquet file

Returns:

  • FileMetaData - file metadata

to_arrow_scalar

def to_arrow_scalar(value: Any, arrow_type: pyarrow.DataType) -> Any

[view_source]

Converts python value to an arrow compute friendly version

from_arrow_scalar

def from_arrow_scalar(arrow_value: pyarrow.Scalar) -> Any

[view_source]

Converts arrow scalar into Python type.

TNewColumns

Sequence of tuples: (field index, field, generating function)

add_constant_column

def add_constant_column(item: TAnyArrowItem,
name: str,
data_type: pyarrow.DataType,
value: Any = None,
nullable: bool = True,
index: int = -1) -> TAnyArrowItem

[view_source]

Add column with a single value to the table.

Arguments:

  • item - Arrow table or record batch
  • name - The new column name
  • data_type - The data type of the new column
  • nullable - Whether the new column is nullable
  • value - The value to fill the new column with
  • index - The index at which to insert the new column. Defaults to -1 (append)

pq_stream_with_new_columns

def pq_stream_with_new_columns(
parquet_file: TFileOrPath,
columns: TNewColumns,
row_groups_per_read: int = 1) -> Iterator[pyarrow.Table]

[view_source]

Add column(s) to the table in batches.

The table is read from parquet row_groups_per_read row groups at a time

Arguments:

  • parquet_file - path or file object to parquet file
  • columns - list of columns to add in the form of (insertion index, pyarrow.Field, column_value_callback) The callback should accept a pyarrow.Table and return an array of values for the column.
  • row_groups_per_read - number of row groups to read at a time. Defaults to 1.

Yields:

pyarrow.Table objects with the new columns added.

cast_arrow_schema_types

def cast_arrow_schema_types(
schema: pyarrow.Schema, type_map: Dict[Callable[[pyarrow.DataType], bool],
Callable[..., pyarrow.DataType]]
) -> pyarrow.Schema

[view_source]

Returns type-casted Arrow schema.

Replaces data types for fields matching a type check in type_map. Type check functions in type_map are assumed to be mutually exclusive, i.e. a data type does not match more than one type check function.

concat_batches_and_tables_in_order

def concat_batches_and_tables_in_order(
tables_or_batches: Iterable[Union[pyarrow.Table, pyarrow.RecordBatch]]
) -> pyarrow.Table

[view_source]

Concatenate iterable of tables and batches into a single table, preserving row order. Zero copy is used during concatenation so schemas must be identical.

row_tuples_to_arrow

def row_tuples_to_arrow(rows: Sequence[Any],
caps: DestinationCapabilitiesContext,
columns: TTableSchemaColumns, tz: str) -> Any

[view_source]

Converts the rows to an arrow table using the columns schema. Columns missing data_type will be inferred from the row data. Columns with object types not supported by arrow are excluded from the resulting table.

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.