Load IBM Event Streams data in Python using dltHub
Build a IBM Event Streams-to-database or-dataframe pipeline in Python using dlt with automatic Cursor support.
In this guide, we'll set up a complete IBM Event Streams data pipeline from API credentials to your first data load in just 10 minutes. You'll end up with a fully declarative Python pipeline based on dlt's REST API connector, like in the partial example code below:
Example code
Why use dltHub Workspace with LLM Context to generate Python pipelines?
- Accelerate pipeline development with AI-native context
- Debug pipelines, validate schemas and data with the integrated Pipeline Dashboard
- Build Python notebooks for end users of your data
- Low maintenance thanks to Schema evolution with type inference, resilience and self documenting REST API connectors. A shallow learning curve makes the pipeline easy to extend by any team member
- dlt is the tool of choice for Pythonic Iceberg Lakehouses, bringing mature data loading to pythonic Iceberg with or without catalogs
What you’ll do
We’ll show you how to generate a readable and easily maintainable Python script that fetches data from ibm_event_streams’s API and loads it into Iceberg, DataFrames, files, or a database of your choice. Here are some of the endpoints you can load:
- Record Management: POST records to topics, retrieve and manage message data within Kafka topics
- Connectors: POST to create and manage Kafka connectors for data integration and pipeline configuration
You will then debug the IBM Event Streams pipeline using our Pipeline Dashboard tool to ensure it is copying the data correctly, before building a Notebook to explore your data and build reports.
Setup & steps to follow
💡Before getting started, let's make sure Cursor is set up correctly:
- We suggest using a model like Claude 3.7 Sonnet or better
- Index the REST API Source tutorial: https://dlthub.com/docs/dlt-ecosystem/verified-sources/rest_api/ and add it to context as @dlt rest api
- Read our full steps on setting up Cursor
Now you're ready to get started!
-
⚙️ Set up
dltWorkspaceInstall dlt with duckdb support:
pip install dlt[workspace]Initialize a dlt pipeline with IBM Event Streams support.
dlt init dlthub:ibm_event_streams duckdbThe
initcommand will setup the necessary files and folders for the next step. -
🤠 Start LLM-assisted coding
Here’s a prompt to get you started:
PromptPlease generate a REST API Source for IBM Event Streams API, as specified in @ibm_event_streams-docs.yaml Start with endpoint(s) topics and connectors and skip incremental loading for now. Place the code in ibm_event_streams_pipeline.py and name the pipeline ibm_event_streams_pipeline. If the file exists, use it as a starting point. Do not add or modify any other files. Use @dlt rest api as a tutorial. After adding the endpoints, allow the user to run the pipeline with python ibm_event_streams_pipeline.py and await further instructions. -
🔒 Set up credentials
The Producer API supports two authentication methods. For Basic authentication, include an Authorization header with the value "Basic <auth_token>" where auth_token is a previously generated Basic authentication token. For SCRAM authentication, use HTTP Basic auth by passing the SCRAM username and password via the -u flag (e.g., -u
: ), which automatically sets the Authorization header. Both methods work with HTTPS endpoints; use HTTP if TLS is disabled on the endpoint. To get the appropriate API keys, please visit the original source at ibm.github.io. If you want to protect your environment secrets in a production environment, look into setting up credentials with dlt.
-
🏃♀️ Run the pipeline in the Python terminal in Cursor
python ibm_event_streams_pipeline.pyIf your pipeline runs correctly, you’ll see something like the following:
Pipeline ibm_event_streams load step completed in 0.26 seconds 1 load package(s) were loaded to destination duckdb and into dataset ibm_event_streams_data The duckdb destination used duckdb:/ibm_event_streams.duckdb location to store data Load package 1749667187.541553 is LOADED and contains no failed jobs -
📈 Debug your pipeline and data with the Pipeline Dashboard
Now that you have a running pipeline, you need to make sure it’s correct, so you do not introduce silent failures like misconfigured pagination or incremental loading errors. By launching the dlt Workspace Pipeline Dashboard, you can see various information about the pipeline to enable you to test it. Here you can see:
- Pipeline overview: State, load metrics
- Data’s schema: tables, columns, types, hints
- You can query the data itself
dlt pipeline ibm_event_streams_pipeline show -
🐍 Build a Notebook with data explorations and reports
With the pipeline and data partially validated, you can continue with custom data explorations and reports. To get started, paste the snippet below into a new marimo Notebook and ask your LLM to go from there. Jupyter Notebooks and regular Python scripts are supported as well.
import dlt data = dlt.pipeline("ibm_event_streams_pipeline").dataset() # get topics table as Pandas frame data.topics.df().head()