Skip to main content

Load Coinbase Data to MotherDuck Using dlt in Python

Need help deploying these pipelines, or figuring out how to run them in your data stack?

Join our Slack community or book a call with our support engineer Violetta.

Coinbase is a leading cryptocurrency exchange platform that allows users to buy, sell, and manage various cryptocurrencies. It offers a secure and user-friendly interface for trading digital assets and provides features such as real-time price tracking, secure wallets, and advanced trading options. Coinbase also supports integration with various financial tools and services, enabling seamless management of digital currency portfolios. This documentation will guide you through loading data from Coinbase to MotherDuck using the open-source Python library dlt. MotherDuck is a fast in-process analytical database that supports a feature-rich SQL dialect and deep integrations into client APIs. This setup will help you efficiently manage and analyze your cryptocurrency data. For more information about Coinbase, visit their website.

dlt Key Features

  • Install dlt with MotherDuck: Install the DLT library with MotherDuck dependencies by running pip install dlt[motherduck]. More details here.
  • Setup Guide: Follow the setup guide to initialize a project, install necessary dependencies, and run your pipeline. Detailed steps can be found here.
  • Write Disposition: All write dispositions are supported, allowing flexibility in how data is written to the destination. Learn more about write dispositions here.
  • Data Loading: By default, parquet files and the COPY command are used to move files to the remote DuckDB database. Find out more about data loading here.
  • DBT Support: This destination integrates with dbt via dbt-duckdb, a community-supported package. More information on dbt support can be found here.

Getting started with your pipeline locally

OpenAPI Source Generator dlt-init-openapi

This walkthrough makes use of the dlt-init-openapi generator cli tool. You can read more about it here. The code generated by this tool uses the dlt rest_api verified source, docs for this are here.

0. Prerequisites

dlt and dlt-init-openapi requires Python 3.9 or higher. Additionally, you need to have the pip package manager installed, and we recommend using a virtual environment to manage your dependencies. You can learn more about preparing your computer for dlt in our installation reference.

1. Install dlt and dlt-init-openapi

First you need to install the dlt-init-openapi cli tool.

pip install dlt-init-openapi

The dlt-init-openapi cli is a powerful generator which you can use to turn any OpenAPI spec into a dlt source to ingest data from that api. The quality of the generator source is dependent on how well the API is designed and how accurate the OpenAPI spec you are using is. You may need to make tweaks to the generated code, you can learn more about this here.

# generate pipeline
# NOTE: add_limit adds a global limit, you can remove this later
# NOTE: you will need to select which endpoints to render, you
# can just hit Enter and all will be rendered.
dlt-init-openapi coinbase --url https://raw.githubusercontent.com/dlt-hub/openapi-specs/main/open_api_specs/Business/coinbase.yaml --global-limit 2
cd coinbase_pipeline
# install generated requirements
pip install -r requirements.txt

The last command will install the required dependencies for your pipeline. The dependencies are listed in the requirements.txt:

dlt>=0.4.12

You now have the following folder structure in your project:

coinbase_pipeline/
├── .dlt/
│ ├── config.toml # configs for your pipeline
│ └── secrets.toml # secrets for your pipeline
├── rest_api/ # The rest api verified source
│ └── ...
├── coinbase/
│ └── __init__.py # TODO: possibly tweak this file
├── coinbase_pipeline.py # your main pipeline script
├── requirements.txt # dependencies for your pipeline
└── .gitignore # ignore files for git (not required)

1.1. Tweak coinbase/__init__.py

This file contains the generated configuration of your rest_api. You can continue with the next steps and leave it as is, but you might want to come back here and make adjustments if you need your rest_api source set up in a different way. The generated file for the coinbase source will look like this:

Click to view full file (507 lines)

from typing import List

import dlt
from dlt.extract.source import DltResource
from rest_api import rest_api_source
from rest_api.typing import RESTAPIConfig


@dlt.source(name="coinbase_source", max_table_nesting=2)
def coinbase_source(
api_key: str = dlt.secrets.value,
base_url: str = dlt.config.value,
) -> List[DltResource]:

# source configuration
source_config: RESTAPIConfig = {
"client": {
"base_url": base_url,
"auth": {

"type": "api_key",
"api_key": api_key,
"name": "CB-ACCESS-API-KEY",
"location": "header"

},
},
"resources":
[
# Returns a list of supported networks and network information for a specific asset.
{
"name": "get_asset_networks",
"table_name": "asset_network_v_1",
"primary_key": "asset_uuid",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/assets/{asset}/networks",
"params": {
"asset": {
"type": "resolve",
"resource": "get_assets",
"field": "asset_id",
},

},
"paginator": "auto",
}
},
# Returns a list of all supported assets.
{
"name": "get_assets",
"table_name": "asset_v_1",
"primary_key": "asset_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/assets",
"paginator": "auto",
}
},
# Retrieves information for a specific asset.
{
"name": "get_asset",
"table_name": "asset_v_1",
"primary_key": "asset_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/assets/{asset}",
"params": {
"asset": {
"type": "resolve",
"resource": "get_assets",
"field": "asset_id",
},

},
"paginator": "auto",
}
},
# Retrieves a list of aggregated candles data for a given instrument, granularity and time range
{
"name": "get_instrument_candles",
"table_name": "candle",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/instruments/{instrument}/candles",
"params": {
"instrument": {
"type": "resolve",
"resource": "get_instruments",
"field": "instrument_id",
},
"granularity": "FILL_ME_IN", # TODO: fill in required query parameter
"start": "FILL_ME_IN", # TODO: fill in required query parameter
# the parameters below can optionally be configured
# "end": "OPTIONAL_CONFIG",

},
"paginator": "auto",
}
},
# Retrieves the trading volumes for each instrument separated by day.
{
"name": "get_instrument_volumes_daily",
"table_name": "daily",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/instruments/volumes/daily",
"params": {
"instruments": "FILL_ME_IN", # TODO: fill in required query parameter
# the parameters below can optionally be configured
# "result_limit": "OPTIONAL_CONFIG",
# "result_offset": "OPTIONAL_CONFIG",
# "time_from": "OPTIONAL_CONFIG",
# "show_other": "OPTIONAL_CONFIG",

},
"paginator": "auto",
}
},
# Return all the fee rate tiers.
{
"name": "get_fee_rate_tiers",
"table_name": "fee_rate_tier_v_1",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/fee-rate-tiers",
"paginator": "auto",
}
},
# Retrieves the historical funding rates for a specific instrument.
{
"name": "get_instrument_funding",
"table_name": "instrument_funding_v_1",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/instruments/{instrument}/funding",
"params": {
"instrument": {
"type": "resolve",
"resource": "get_instruments",
"field": "instrument_id",
},
# the parameters below can optionally be configured
# "result_limit": "OPTIONAL_CONFIG",
# "result_offset": "OPTIONAL_CONFIG",

},
"paginator": "auto",
}
},
# Retrieves the current quote for a specific instrument.
{
"name": "get_instrument_quote",
"table_name": "instrument_quote_v_1",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/instruments/{instrument}/quote",
"params": {
"instrument": {
"type": "resolve",
"resource": "get_instruments",
"field": "instrument_id",
},

},
"paginator": "auto",
}
},
# Returns all of the instruments available for trading.
{
"name": "get_instruments",
"table_name": "instrument_v_1",
"primary_key": "instrument_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/instruments",
"paginator": "auto",
}
},
# Retrieves market information for a specific instrument.
{
"name": "get_instrument",
"table_name": "instrument_v_1",
"primary_key": "instrument_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/instruments/{instrument}",
"params": {
"instrument": {
"type": "resolve",
"resource": "get_instruments",
"field": "instrument_id",
},

},
"paginator": "auto",
}
},
# Returns a list of active orders resting on the order book matching the requested criteria. Does not return any rejected, cancelled, or fully filled orders as they are not active.
{
"name": "get_orders",
"table_name": "order_result_v_1",
"primary_key": "order_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "results",
"path": "/api/v1/orders",
"params": {
# the parameters below can optionally be configured
# "portfolio": "OPTIONAL_CONFIG",
# "instrument": "OPTIONAL_CONFIG",
# "instrument_type": "OPTIONAL_CONFIG",
# "client_order_id": "OPTIONAL_CONFIG",
# "event_type": "OPTIONAL_CONFIG",
# "ref_datetime": "OPTIONAL_CONFIG",
# "result_limit": "OPTIONAL_CONFIG",
# "result_offset": "OPTIONAL_CONFIG",

},
"paginator": "auto",
}
},
# Retrieves a single order. The order retrieved can be either active or inactive.
{
"name": "get_order",
"table_name": "order_result_v_1",
"primary_key": "order_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/orders/{id}",
"params": {
"id": {
"type": "resolve",
"resource": "get_orders",
"field": "order_id",
},
"portfolio": "FILL_ME_IN", # TODO: fill in required query parameter

},
"paginator": "auto",
}
},
# Retrieves the summary, positions, and balances of a portfolio.
{
"name": "get_portfolio_detail",
"table_name": "portfolio_balance_v_1",
"primary_key": "asset_uuid",
"write_disposition": "merge",
"endpoint": {
"data_selector": "balances",
"path": "/api/v1/portfolios/{portfolio}/detail",
"params": {
"portfolio": {
"type": "resolve",
"resource": "get_portfolios",
"field": "portfolio_id",
},

},
"paginator": "auto",
}
},
# Returns all of the balances for a given portfolio.
{
"name": "get_portfolio_balances",
"table_name": "portfolio_balance_v_1",
"primary_key": "asset_uuid",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/portfolios/{portfolio}/balances",
"params": {
"portfolio": {
"type": "resolve",
"resource": "get_portfolios",
"field": "portfolio_id",
},

},
"paginator": "auto",
}
},
# Retrieves the balance for a given portfolio and asset.
{
"name": "get_portfolio_balance",
"table_name": "portfolio_balance_v_1",
"primary_key": "asset_uuid",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/portfolios/{portfolio}/balances/{asset}",
"params": {
"asset": {
"type": "resolve",
"resource": "get_portfolio_balances",
"field": "asset_uuid",
},
"portfolio": "FILL_ME_IN", # TODO: fill in required path parameter

},
"paginator": "auto",
}
},
# Retrieves the Perpetual Future and Spot fee rate tiers for the user.
{
"name": "get_portfolios_fee_rates",
"table_name": "portfolio_fee_rate_v_1",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/portfolios/fee-rates",
"paginator": "auto",
}
},
# Returns fills for specified portfolios or fills for all portfolios if none are provided.
{
"name": "get_multi_portfolio_fills",
"table_name": "portfolio_fill_v_1",
"primary_key": "fill_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "results",
"path": "/api/v1/portfolios/fills",
"params": {
# the parameters below can optionally be configured
# "portfolios": "OPTIONAL_CONFIG",
# "order_id": "OPTIONAL_CONFIG",
# "client_order_id": "OPTIONAL_CONFIG",
# "ref_datetime": "OPTIONAL_CONFIG",
# "result_limit": "OPTIONAL_CONFIG",
# "result_offset": "OPTIONAL_CONFIG",
# "time_from": "OPTIONAL_CONFIG",

},
"paginator": "auto",
}
},
# Returns all of the fills for a given portfolio.
{
"name": "get_portfolio_fills",
"table_name": "portfolio_fill_v_1",
"primary_key": "fill_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "results",
"path": "/api/v1/portfolios/{portfolio}/fills",
"params": {
"portfolio": {
"type": "resolve",
"resource": "get_portfolios",
"field": "portfolio_id",
},
# the parameters below can optionally be configured
# "order_id": "OPTIONAL_CONFIG",
# "client_order_id": "OPTIONAL_CONFIG",
# "ref_datetime": "OPTIONAL_CONFIG",
# "result_limit": "OPTIONAL_CONFIG",
# "result_offset": "OPTIONAL_CONFIG",
# "time_from": "OPTIONAL_CONFIG",

},
"paginator": "auto",
}
},
# Returns all of the positions for a given portfolio.
{
"name": "get_portfolio_positions",
"table_name": "portfolio_position_v_1",
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/portfolios/{portfolio}/positions",
"params": {
"portfolio": {
"type": "resolve",
"resource": "get_portfolios",
"field": "portfolio_id",
},

},
"paginator": "auto",
}
},
# Retrieves the position for a given portfolio and symbol.
{
"name": "get_portfolio_position",
"table_name": "portfolio_position_v_1",
"primary_key": "id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/portfolios/{portfolio}/positions/{instrument}",
"params": {
"instrument": {
"type": "resolve",
"resource": "get_portfolio_positions",
"field": "id",
},
"portfolio": "FILL_ME_IN", # TODO: fill in required path parameter

},
"paginator": "auto",
}
},
# Retrieves the high level overview of a portfolio.
{
"name": "get_portfolio_summary",
"table_name": "portfolio_summary_v_1",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/portfolios/{portfolio}/summary",
"params": {
"portfolio": {
"type": "resolve",
"resource": "get_portfolios",
"field": "portfolio_id",
},

},
"paginator": "auto",
}
},
# Returns all of the user's portfolios.
{
"name": "get_portfolios",
"table_name": "portfolio_v_1",
"primary_key": "portfolio_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/portfolios",
"paginator": "auto",
}
},
# Returns the user's specified portfolio.
{
"name": "get_portfolio",
"table_name": "portfolio_v_1",
"primary_key": "portfolio_id",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/portfolios/{portfolio}",
"params": {
"portfolio": {
"type": "resolve",
"resource": "get_portfolios",
"field": "portfolio_id",
},

},
"paginator": "auto",
}
},
{
"name": "get_transfers",
"table_name": "transfer_v_1",
"primary_key": "transfer_uuid",
"write_disposition": "merge",
"endpoint": {
"data_selector": "results",
"path": "/api/v1/transfers",
"params": {
# the parameters below can optionally be configured
# "portfolio [DEPRECATED]": "OPTIONAL_CONFIG",
# "portfolios": "OPTIONAL_CONFIG",
# "time_from": "OPTIONAL_CONFIG",
# "time_to": "OPTIONAL_CONFIG",
# "result_limit": "OPTIONAL_CONFIG",
# "result_offset": "OPTIONAL_CONFIG",
# "status": "OPTIONAL_CONFIG",
# "type": "OPTIONAL_CONFIG",

},
"paginator": "auto",
}
},
{
"name": "get_transfer",
"table_name": "transfer_v_1",
"primary_key": "transfer_uuid",
"write_disposition": "merge",
"endpoint": {
"data_selector": "$",
"path": "/api/v1/transfers/{transfer_uuid}",
"params": {
"transfer_uuid": {
"type": "resolve",
"resource": "get_transfers",
"field": "transfer_uuid",
},

},
"paginator": "auto",
}
},
]
}

return rest_api_source(source_config)

2. Configuring your source and destination credentials

info

dlt-init-openapi will try to detect which authentication mechanism (if any) is used by the API in question and add a placeholder in your secrets.toml.

  • If you know your API needs authentication, but none was detected, you can learn more about adding authentication to the rest_api here.
  • OAuth detection currently is not supported, but you can supply your own authentication mechanism as outlined here.

The dlt cli will have created a .dlt directory in your project folder. This directory contains a config.toml file and a secrets.toml file that you can use to configure your pipeline. The automatically created version of these files look like this:

generated config.toml


[runtime]
log_level="INFO"

[sources.coinbase]
# Base URL for the API
base_url = "https://api.international.coinbase.com"

generated secrets.toml


[sources.coinbase]
# secrets for your coinbase source
api_key = "FILL ME OUT" # TODO: fill in your credentials

2.1. Adjust the generated code to your usecase

Further help setting up your source and destinations

At this time, the dlt-init-openapi cli tool will always create pipelines that load to a local duckdb instance. Switching to a different destination is trivial, all you need to do is change the destination parameter in coinbase_pipeline.py to motherduck and supply the credentials as outlined in the destination doc linked below.

  • Read more about setting up the rest_api source in our docs.
  • Read more about setting up the MotherDuck destination in our docs.

3. Running your pipeline for the first time

The dlt cli has also created a main pipeline script for you at coinbase_pipeline.py, as well as a folder coinbase that contains additional python files for your source. These files are your local copies which you can modify to fit your needs. In some cases you may find that you only need to do small changes to your pipelines or add some configurations, in other cases these files can serve as a working starting point for your code, but will need to be adjusted to do what you need them to do.

The main pipeline script will look something like this:


import dlt

from coinbase import coinbase_source


if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="coinbase_pipeline",
destination='duckdb',
dataset_name="coinbase_data",
progress="log",
export_schema_path="schemas/export"
)
source = coinbase_source()
info = pipeline.run(source)
print(info)

Provided you have set up your credentials, you can run your pipeline like a regular python script with the following command:

python coinbase_pipeline.py

4. Inspecting your load result

You can now inspect the state of your pipeline with the dlt cli:

dlt pipeline coinbase_pipeline info

You can also use streamlit to inspect the contents of your MotherDuck destination for this:

# install streamlit
pip install streamlit
# run the streamlit app for your pipeline with the dlt cli:
dlt pipeline coinbase_pipeline show

5. Next steps to get your pipeline running in production

One of the beauties of dlt is, that we are just a plain Python library, so you can run your pipeline in any environment that supports Python >= 3.8. We have a couple of helpers and guides in our docs to get you there:

The Deploy section will show you how to deploy your pipeline to

  • Deploy with GitHub Actions: Learn how to deploy your dlt pipeline using GitHub Actions. This guide provides step-by-step instructions for setting up continuous integration and deployment.
  • Deploy with Airflow and Google Composer: Follow this comprehensive guide to deploy your dlt pipeline with Airflow. It covers everything from initializing your project to running your pipeline in a managed Airflow environment.
  • Deploy with Google Cloud Functions: Discover how to deploy your dlt pipeline using Google Cloud Functions for a serverless execution environment.
  • Explore Other Deployment Options: Check out additional resources and guides for deploying dlt pipelines using various platforms and methods here.

The running in production section will teach you about:

  • How to Monitor your pipeline: Learn how to effectively monitor your dlt pipeline in production to ensure smooth and reliable data processing. How to Monitor your pipeline
  • Set up alerts: Set up alerts to get notified about any issues or anomalies in your dlt pipeline, ensuring you can respond quickly to any problems. Set up alerts
  • And set up tracing: Implement tracing in your dlt pipeline to track and debug the flow of data through your system, making it easier to identify and fix issues. And set up tracing

Available Sources and Resources

For this verified source the following sources and resources are available

Source Coinbase

Loads Coinbase trading data, including instruments, transfers, portfolios, orders, assets, and fees.

Resource NameWrite DispositionDescription
instrument_v_1appendInformation about various trading instruments available on Coinbase
transfer_v_1appendDetails of transfers between wallets or accounts
instrument_quote_v_1appendReal-time quotes for trading instruments
portfolio_summary_v_1appendSummary of user's cryptocurrency portfolio
order_result_v_1appendResults of buy/sell orders placed on the platform
portfolio_fill_v_1appendDetails of order fills in the user's portfolio
dailyappendDaily trading data
candleappendHistorical price data in candlestick format
asset_v_1appendInformation about various digital assets
portfolio_balance_v_1appendBalance details of the user's portfolio
fee_rate_tier_v_1appendInformation about fee rates based on trading volume tiers
portfolio_fee_rate_v_1appendFee rates applied to the user's portfolio
asset_network_v_1appendNetwork details of various digital assets
portfolio_position_v_1appendPosition details within the user's portfolio
portfolio_v_1appendComprehensive details of the user's portfolio
instrument_funding_v_1appendFunding details for trading instruments

Additional pipeline guides

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.