Pulsar Python API Docs | dltHub

Build a Pulsar-to-database pipeline in Python using dlt with AI Workbench support for Claude Code, Cursor, and Codex.

Last updated:

Pulsar REST APIs allow interaction with Pulsar to retrieve information or perform actions. The architecture includes brokers handling messages and coordinating with BookKeeper. The metadata store maintains cluster metadata. The REST API base URL is http://<broker-host>:8080 (default HTTP) or https://<broker-host>:8443 (TLS) and Broker admin REST and client APIs use JWT token authentication (token‑based) when authentication is enabled..

dlt is an open-source Python library that handles authentication, pagination, and schema evolution automatically. dlthub provides AI context files that enable code assistants to generate production-ready pipelines. Install with uv pip install "dlt[workspace]" and start loading Pulsar data in under 10 minutes.


What data can I load from Pulsar?

Here are some of the endpoints you can load from Pulsar:

ResourceEndpointMethodData selectorDescription
tenantsadmin/v2/tenantsGETGet the list of existing tenants (top‑level JSON array of tenant names).
clustersadmin/v2/clustersGETGet list of all Pulsar clusters (top‑level JSON array).
brokersadmin/v2/brokers/{cluster}GETGet list of active brokers (top‑level JSON array of broker ids).
namespacesadmin/v2/namespaces/{tenant}GETGet the list of namespaces for a tenant (top‑level JSON array).
topicsadmin/v2/persistent/{tenant}/{namespace}GETGet the list of persistent topics under a namespace (top‑level JSON array of topic names).
functionsadmin/v3/functions/{tenant}/{namespace}GETGet list/details of Pulsar Functions in the namespace (response may be array/object depending on endpoint).
sinksadmin/v3/sinks/{tenant}/{namespace}GETGet list of configured sinks in a namespace (typically top‑level array).
sourcesadmin/v3/sources/{tenant}/{namespace}GETGet list of configured sources in a namespace (typically top‑level array).
lookuplookup/v2/topicLookup/{topic}GETLookup endpoints return JSON object with owner broker and bundle info (response is an object, not an array).
bookiesadmin/v2/bookies/allGETbookiesGet raw info for all bookies; response shown in docs under key "bookies" (object containing array).

How do I authenticate with the Pulsar API?

Pulsar uses token (JWT) authentication via the AuthenticationToken plugin. For admin REST calls use the broker's webServiceUrl (default http://localhost:8080) and supply the token as the client authentication parameter or via HTTP auth header when the broker is configured for token auth (use TLS in production).

1. Get your credentials

  1. Ask your Pulsar administrator to generate a signed JWT for the principal (role) you will use.
  2. Administrator creates/signs a token using the configured secret/public‑key method and shares the compact JWT string.
  3. Store the token in your secrets.toml as the token value for the Pulsar source. (See Pulsar docs: enable AuthenticationProviderToken and brokerClientAuthenticationParameters to configure tokens.)

2. Add them to .dlt/secrets.toml

[sources.pulsar_source] # place inside [sources.pulsar_source] # JWT token used for Pulsar authentication token = "your_jwt_token_here"

dlt reads this automatically at runtime — never hardcode tokens in your pipeline script. For production environments, see setting up credentials with dlt for environment variable and vault-based options.


How do I set up and run the pipeline?

Set up a virtual environment and install dlt:

uv venv && source .venv/bin/activate uv pip install "dlt[workspace]"

1. Install the dlt AI Workbench:

dlt ai init --agent <your-agent> # <agent>: claude | cursor | codex

This installs project rules, a secrets management skill, appropriate ignore files, and configures the dlt MCP server for your agent. Learn more →

2. Install the rest-api-pipeline toolkit:

dlt ai toolkit rest-api-pipeline install

This loads the skills and context about dlt the agent uses to build the pipeline iteratively, efficiently, and safely. The agent uses MCP tools to inspect credentials — it never needs to read your secrets.toml directly. Learn more →

3. Start LLM-assisted coding:

Use /find-source to load data from the Pulsar API into DuckDB.

The rest-api-pipeline toolkit takes over from here — it reads relevant API documentation, presents you with options for which endpoints to load, and follows a structured workflow to scaffold, debug, and validate the pipeline step by step.

4. Run the pipeline:

python pulsar_pipeline.py

If everything is configured correctly, you'll see output like this:

Pipeline pulsar_pipeline load step completed in 0.26 seconds 1 load package(s) were loaded to destination duckdb and into dataset pulsar_data The duckdb destination used duckdb:/pulsar.duckdb location to store data Load package 1749667187.541553 is LOADED and contains no failed jobs

Inspect your pipeline and data:

dlt pipeline pulsar_pipeline show

This opens the Pipeline Dashboard where you can verify pipeline state, load metrics, schema (tables, columns, types), and query the loaded data directly.


Python pipeline example

This example loads tenants and topics from the Pulsar API into DuckDB. It mirrors the endpoint and data selector configuration from the table above:

import dlt from dlt.sources.rest_api import RESTAPIConfig, rest_api_resources @dlt.source def pulsar_source(token=dlt.secrets.value): config: RESTAPIConfig = { "client": { "base_url": "http://<broker-host>:8080 (default HTTP) or https://<broker-host>:8443 (TLS)", "auth": { "type": "bearer", "token": token, }, }, "resources": [ {"name": "tenants", "endpoint": {"path": "admin/v2/tenants"}}, {"name": "topics", "endpoint": {"path": "admin/v2/persistent/{tenant}/{namespace}"}} ], } yield from rest_api_resources(config) def get_data() -> None: pipeline = dlt.pipeline( pipeline_name="pulsar_pipeline", destination="duckdb", dataset_name="pulsar_data", ) load_info = pipeline.run(pulsar_source()) print(load_info)

To add more endpoints, append entries from the resource table to the "resources" list using the same name, path, and data_selector pattern.


How do I query the loaded data?

Once the pipeline runs, dlt creates one table per resource. You can query with Python or SQL.

Python (pandas DataFrame):

import dlt data = dlt.pipeline("pulsar_pipeline").dataset() sessions_df = data.tenants.df() print(sessions_df.head())

SQL (DuckDB example):

SELECT * FROM pulsar_data.tenants LIMIT 10;

In a marimo or Jupyter notebook:

import dlt data = dlt.pipeline("pulsar_pipeline").dataset() data.tenants.df().head()

See how to explore your data in marimo Notebooks and how to query your data in Python with dataset.


What destinations can I load Pulsar data to?

dlt supports loading into any of these destinations — only the destination parameter changes:

DestinationExample value
DuckDB (local, default)"duckdb"
PostgreSQL"postgres"
BigQuery"bigquery"
Snowflake"snowflake"
Redshift"redshift"
Databricks"databricks"
Filesystem (S3, GCS, Azure)"filesystem"

Change the destination in dlt.pipeline(destination="snowflake") and add credentials in .dlt/secrets.toml. See the full destinations list.


Next steps

Continue your data engineering journey with the other toolkits of the dltHub AI Workbench:

  • data-exploration — Build custom notebooks, charts, and dashboards for deeper analysis with marimo notebooks.
  • dlthub-runtime — Deploy, schedule, and monitor your pipeline in production.
dlt ai toolkit data-exploration install dlt ai toolkit dlthub-runtime install

Was this page helpful?

Community Hub

Need more dlt context for Pulsar?

Request dlt skills, commands, AGENT.md files, and AI-native context.