Skip to main content

Pokemon details in parallel using transformers

info

The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/transformers

About this Example

For this example, we will be loading Pokemon data from the PokeAPI with the help of transformers to load Pokemon details in parallel.

We'll learn how to:

  • create 2 transformers and connect them to a resource with the pipe operator |;
  • load these transformers in parallel using the @dlt.defer decorator;
  • configure parallelism in the config.toml file;
  • deselect the main resource, so it will not be loaded into the database;
  • importing and using a pre-configured requests library with automatic retries (from dlt.sources.helpers import requests).

Full source code

import dlt
from dlt.sources.helpers import requests


@dlt.source(max_table_nesting=2)
def source(pokemon_api_url: str):
# note that we deselect `pokemon_list` - we do not want it to be loaded
@dlt.resource(write_disposition="replace", selected=False)
def pokemon_list():
"""Retrieve a first page of Pokemons and yield it. We do not retrieve all the pages in this example"""
yield requests.get(pokemon_api_url).json()["results"]

# transformer that retrieves a list of objects in parallel
@dlt.transformer
def pokemon(pokemons):
"""Yields details for a list of `pokemons`"""

# @dlt.defer marks a function to be executed in parallel
# in a thread pool
@dlt.defer
def _get_pokemon(_pokemon):
return requests.get(_pokemon["url"]).json()

# call and yield the function result normally, the @dlt.defer takes care of parallelism
for _pokemon in pokemons:
yield _get_pokemon(_pokemon)

# a special case where just one item is retrieved in transformer
# a whole transformer may be marked for parallel execution
@dlt.transformer(parallelized=True)
def species(pokemon_details):
"""Yields species details for a pokemon"""
species_data = requests.get(pokemon_details["species"]["url"]).json()
# link back to pokemon so we have a relation in loaded data
species_data["pokemon_id"] = pokemon_details["id"]
# You can return the result instead of yield since the transformer only generates one result
return species_data

# create two simple pipelines with | operator
# 1. send list of pokemons into `pokemon` transformer to get pokemon details
# 2. send pokemon details into `species` transformer to get species details
# NOTE: dlt is smart enough to get data from pokemon_list and pokemon details once

return (pokemon_list | pokemon, pokemon_list | pokemon | species)


if __name__ == "__main__":
# build duck db pipeline
pipeline = dlt.pipeline(
pipeline_name="pokemon", destination="duckdb", dataset_name="pokemon_data"
)

# the pokemon_list resource does not need to be loaded
load_info = pipeline.run(source("https://pokeapi.co/api/v2/pokemon"))
print(load_info)

# verify that all went well
row_counts = pipeline.last_trace.last_normalize_info.row_counts
assert row_counts["pokemon"] == 20
assert row_counts["species"] == 20
assert "pokemon_list" not in row_counts

# make sure nothing failed
load_info.raise_on_failed_jobs()

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.