Access loaded data in Python
This guide explains how to access and manipulate data that has been loaded into your destination using the dlt
Python library. After running your pipelines and loading data, you can use the pipeline.dataset()
and data frame expressions, Ibis or SQL to query the data and read it as records, Pandas frames or Arrow tables.
Quick start example
Here's a full example of how to retrieve data from a pipeline and load it into a Pandas DataFrame or a PyArrow Table.
# Assuming you have a Pipeline object named 'pipeline'. You can create one with the dlt cli: dlt init fruitshop duckdb
# and you have loaded the data of the fruitshop example source into the destination
# the tables available in the destination are:
# - customers
# - inventory
# - purchases
# Step 1: Get the readable dataset from the pipeline
dataset = pipeline.dataset()
# Step 2: Access a table as a ReadableRelation
customers_relation = dataset.customers # Or dataset["customers"]
# Step 3: Fetch the entire table as a Pandas DataFrame
df = customers_relation.df()
# Alternatively, fetch as a PyArrow Table
arrow_table = customers_relation.arrow()
Getting started
Assuming you have a Pipeline
object (let's call it pipeline
), you can obtain a ReadableDataset
and access your tables as ReadableRelation
objects.
Note: The ReadableDataset
and ReadableRelation
objects are lazy-loading. They will only query and retrieve data when you perform an action that requires it, such as fetching data into a DataFrame or iterating over the data. This means that simply creating these objects does not load data into memory, making your code more efficient.
Access the dataset
# Get the readable dataset from the pipeline
dataset = pipeline.dataset()
# print the row counts of all tables in the destination as dataframe
print(dataset.row_counts().df())
Access tables as dataset
You can access tables in your dataset using either attribute access or item access.
# Using attribute access
customers_relation = dataset.customers
# Using item access
customers_relation = dataset["customers"]
Reading data
Once you have a ReadableRelation
, you can read data in various formats and sizes.
Fetch the entire table
Loading full tables into memory without limiting or iterating over them can consume a large amount of memory and may cause your program to crash if the table is too large. It's recommended to use chunked iteration or apply limits when dealing with large datasets.
As a Pandas DataFrame
df = customers_relation.df()
As a PyArrow Table
arrow_table = customers_relation.arrow()
As a list of Python tuples
items_list = customers_relation.fetchall()
Lazy loading behavior
The ReadableDataset
and ReadableRelation
objects are lazy-loading. This means that they do not immediately fetch data when you create them. Data is only retrieved when you perform an action that requires it, such as calling .df()
, .arrow()
, or iterating over the data. This approach optimizes performance and reduces unnecessary data loading.
Iterating over data in chunks
To handle large datasets efficiently, you can process data in smaller chunks.
Iterate as Pandas DataFrames
for df_chunk in customers_relation.iter_df(chunk_size=5):
# Process each DataFrame chunk
pass
Iterate as PyArrow Tables
for arrow_chunk in customers_relation.iter_arrow(chunk_size=5):
# Process each PyArrow chunk
pass
Iterate as lists of tuples
for items_chunk in customers_relation.iter_fetch(chunk_size=5):
# Process each chunk of tuples
pass
The methods available on the ReadableRelation correspond to the methods available on the cursor returned by the SQL client. Please refer to the SQL client guide for more information.
Connection Handling
For every call that actually fetches data from the destination, such as df()
, arrow()
, fetchall()
etc., the dataset will open a connection and close it after it has been retrieved or the iterator is completed. You can keep the connection open for multiple requests with the dataset context manager:
# the dataset context manager will keep the connection open
# and close it after the with block is exited
with dataset as dataset_:
print(dataset.customers.limit(50).arrow())
print(dataset.purchases.arrow())
Special queries
You can use the row_counts
method to get the row counts of all tables in the destination as a DataFrame.
# print the row counts of all tables in the destination as dataframe
print(dataset.row_counts().df())
# or as tuples
print(dataset.row_counts().fetchall())
Modifying queries
You can refine your data retrieval by limiting the number of records, selecting specific columns, or chaining these operations.
Limit the number of records
# Get the first 50 items as a PyArrow table
arrow_table = customers_relation.limit(50).arrow()
Using head()
to get the first 5 records
df = customers_relation.head().df()
Select specific columns
# Select only 'id' and 'name' columns
items_list = customers_relation.select("id", "name").fetchall()
# Alternate notation with brackets
items_list = customers_relation[["id", "name"]].fetchall()
# Only get one column
items_list = customers_relation[["name"]].fetchall()
Chain operations
You can combine select
, limit
, and other methods.
# Select columns and limit the number of records
arrow_table = customers_relation.select("id", "name").limit(50).arrow()
Modifying queries with ibis expressions
If you install the amazing ibis library, you can use ibis expressions to modify your queries.
pip install ibis-framework
dlt will then wrap an ibis.UnboundTable
with a ReadableIbisRelation
object under the hood that will allow you to modify the query of a reltaion using ibis expressions:
# now that ibis is installed, we can get a dataset with ibis relations
dataset = pipeline.dataset()
# get two relations
customers_relation = dataset["customers"]
purchases_relation = dataset["purchases"]
# join them using an ibis expression
joined_relation = customers_relation.join(
purchases_relation, customers_relation.id == purchases_relation.customer_id
)
# now we can use the ibis expression to filter the data
filtered_relation = joined_relation.filter(purchases_relation.quantity > 1)
# we can inspect the query that will be used to read the data
print(filtered_relation.query)
# and finally fetch the data as a pandas dataframe, the same way we would do with a normal relation
df = filtered_relation.df()
# a few more examples
# get all customers from berlin and london
customers_relation.filter(customers_relation.city.isin(["berlin", "london"])).df()
# limit and offset
customers_relation.limit(10, offset=5).arrow()
# mutate columns by adding a new colums that always is 10 times the value of the id column
customers_relation.mutate(new_id=customers_relation.id * 10).df()
# sort asc and desc
import ibis
customers_relation.order_by(ibis.desc("id"), ibis.asc("city")).limit(10)
# group by and aggregate
customers_relation.group_by("city").having(customers_relation.count() >= 3).aggregate(
sum_id=customers_relation.id.sum()
).df()
# subqueries
customers_relation.filter(customers_relation.city.isin(["berlin", "london"])).df()
You can learn more about the available expressions on the ibis for sql users page.
Keep in mind that you can use only methods that modify the executed query and none of the methods ibis provides for fetching data. This is done with the same methods defined on the regular relations explained above. If you need full native ibis integration, please read the ibis section in the advanced part further down. Additionally, not all ibis expressions may be supported by all destinations and sql dialects.
Supported destinations
All SQL and filesystem destinations supported by dlt
can utilize this data access interface.
Reading data from filesystem
For filesystem destinations, dlt
uses DuckDB under the hood to create views on iceberg and delta tables or from Parquet, JSONL and csv files. This allows you to query data stored in files using the same interface as you would with SQL databases. If you plan on accessing data in buckets or the filesystem a lot this way, it is advised to load data into delta or iceberg tables, as DuckDB is able to only load the parts of the data actually needed for the query to work.
By default dlt
will not autorefresh views created on iceberg tables and files when new data is loaded. This prevents wasting resources on
file globbing and reloading iceberg metadata for every query. You can change this behavior with always_refresh_views
flag.
Note: delta
tables are by default on autorefresh which is implemented by delta core and seems to be pretty efficient.
Examples
Fetch one record as a tuple
record = customers_relation.fetchone()
Fetch many records as tuples
records = customers_relation.fetchmany(10)
Iterate over data with limit and column selection
Note: When iterating over filesystem tables, the underlying DuckDB may give you a different chunk size depending on the size of the parquet files the table is based on.
# Dataframes
for df_chunk in customers_relation.select("id", "name").limit(100).iter_df(chunk_size=20): ...
# Arrow tables
for arrow_table in (
customers_relation.select("id", "name").limit(100).iter_arrow(chunk_size=20)
): ...
# Python tuples
for records in customers_relation.select("id", "name").limit(100).iter_fetch(chunk_size=20):
# Process each modified DataFrame chunk
...