Skip to main content

Load mysql table with ConnectorX & Arrow

info

The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/connector_x_arrow

About this Example

The example script below takes genome data from public mysql instance and then loads it into duckdb. Mind that your destination must support loading of parquet files as this is the format that dlt uses to save arrow tables. Connector X allows to get data from several popular databases and creates in memory Arrow table which dlt then saves to load package and loads to the destination.

tip

You can yield several tables if your data is large and you need to partition your load.

We'll learn:

  • How to get arrow tables from connector X and yield them.
  • That merge and incremental loads work with arrow tables.
  • How to enable incremental loading for efficient data extraction.
  • How to use build in ConnectionString credentials.

Full source code

import connectorx as cx

import dlt
from dlt.sources.credentials import ConnectionStringCredentials


def read_sql_x(
conn_str: ConnectionStringCredentials = dlt.secrets.value,
query: str = dlt.config.value,
):
yield cx.read_sql(
conn_str.to_native_representation(),
query,
return_type="arrow2",
protocol="binary",
)


def genome_resource():
# create genome resource with merge on `upid` primary key
genome = dlt.resource(
name="genome",
write_disposition="merge",
primary_key="upid",
standalone=True,
)(read_sql_x)(
"mysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam", # type: ignore[arg-type]
"SELECT * FROM genome ORDER BY created LIMIT 1000",
)
# add incremental on created at
genome.apply_hints(incremental=dlt.sources.incremental("created"))
return genome


if __name__ == "__main__":
pipeline = dlt.pipeline(destination="duckdb")
genome = genome_resource()

load_info = pipeline.run(genome)
print(load_info)
print(pipeline.last_trace.last_normalize_info)
# NOTE: run pipeline again to see that no more records got loaded thanks to incremental loading

# check that stuff was loaded
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["genome"] == 1000

# make sure nothing failed
load_info.raise_on_failed_jobs()

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.