Skip to main content

Custom destination with BigQuery

info

The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/custom_destination_bigquery

About this Example

In this example, you'll find a Python script that demonstrates how to load to bigquey with the custom destination.

We'll learn how to:

Full source code

import dlt
import pandas as pd
import pyarrow as pa
from google.cloud import bigquery

from dlt.common.configuration.specs import GcpServiceAccountCredentials

# constants
OWID_DISASTERS_URL = (
"https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
)
# this table needs to be manually created in your gc account
# format: "your-project.your_dataset.your_table"
BIGQUERY_TABLE_ID = "chat-analytics-rasa-ci.ci_streaming_insert.natural-disasters"


# dlt sources
@dlt.resource(name="natural_disasters")
def resource(url: str):
# load pyarrow table with pandas
table = pa.Table.from_pandas(pd.read_csv(url))
# we add a list type column to demontrate bigquery lists
table = table.append_column(
"tags",
pa.array(
[["disasters", "earthquakes", "floods", "tsunamis"]] * len(table),
pa.list_(pa.string()),
),
)
# we add a struct type column to demonstrate bigquery structs
table = table.append_column(
"meta",
pa.array(
[{"loaded_by": "dlt"}] * len(table),
pa.struct([("loaded_by", pa.string())]),
),
)
yield table


# dlt biquery custom destination
# we can use the dlt provided credentials class
# to retrieve the gcp credentials from the secrets
@dlt.destination(name="bigquery", loader_file_format="parquet", batch_size=0)
def bigquery_insert(
items, table, credentials: GcpServiceAccountCredentials = dlt.secrets.value
) -> None:
client = bigquery.Client(
credentials.project_id, credentials.to_native_credentials(), location="US"
)
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.PARQUET,
schema_update_options=bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
)
# since we have set the batch_size to 0, we get a filepath and can load the file directly
with open(items, "rb") as f:
load_job = client.load_table_from_file(f, BIGQUERY_TABLE_ID, job_config=job_config)
load_job.result() # Waits for the job to complete.


if __name__ == "__main__":
# run the pipeline and print load results
pipeline = dlt.pipeline(
pipeline_name="csv_to_bigquery_insert",
destination=bigquery_insert,
dataset_name="mydata",
full_refresh=True,
)
load_info = pipeline.run(resource(url=OWID_DISASTERS_URL))

print(load_info)

# make sure nothing failed
load_info.raise_on_failed_jobs()

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.