Elasticsearch Python API Docs | dltHub

Build a Elasticsearch-to-database pipeline in Python using dlt with AI Workbench support for Claude Code, Cursor, and Codex.

Last updated:

The Elasticsearch REST API allows retrieval of index recovery information, showing ongoing and completed shard recoveries for specified indices. This API is part of the broader Elasticsearch REST APIs documentation. The documentation is licensed under Apache 2.0. The REST API base URL is http://localhost:9200 and Supports HTTP basic auth and API keys (Bearer) depending on deployment; local clusters may be unauthenticated..

dlt is an open-source Python library that handles authentication, pagination, and schema evolution automatically. dlthub provides AI context files that enable code assistants to generate production-ready pipelines. Install with uv pip install "dlt[workspace]" and start loading Elasticsearch data in under 10 minutes.


What data can I load from Elasticsearch?

Here are some of the endpoints you can load from Elasticsearch:

ResourceEndpointMethodData selectorDescription
cat_indices_cat/indices?format=jsonGET(top-level array)Returns list of indices (use format=json to get JSON array).
cat_recovery_cat/recovery?format=jsonGET(top-level array)Returns shard recovery information (ongoing/completed) as array when format=json.
cluster_health_cluster/healthGET(object)Cluster health summary.
nodes_stats_nodes/statsGETnodesReturns nodes statistics under the "nodes" key.
search_search or {index}/_searchGEThits.hitsSearch returns matching documents inside hits.hits array.
indices_get{index}GET(object keyed by index name)Returns index metadata/mappings/settings keyed by index name.
indices_recovery_recovery or {index}/_recoveryGETindicesIndex recovery endpoint returns data under the "indices" key.
indices_stats_stats or {index}/_statsGETindicesIndex statistics under the "indices" key.
tasks_list_tasksGETnodesReturns task list information (nodes object with tasks).
count_count or {index}/_countGET(object with count)Returns count as numeric field "count" in response object.

How do I authenticate with the Elasticsearch API?

Elasticsearch supports HTTP Basic authentication (username/password) and API keys/Bearer tokens for secured clusters; include credentials in the Authorization header (Basic base64(user:pass) or ApiKey/ Bearer token). For local unsecured clusters no auth header is required.

1. Get your credentials

  1. For Elastic Cloud: create API key or obtain username/password from deployment console. 2) For self-managed secured cluster: create a user with role in Kibana (Stack Management > Security > Users) or run elasticsearch-users to add a native user. 3) To create an API key: call POST /_security/api_key with basic auth and store the returned id and api_key (or use Cloud console).

2. Add them to .dlt/secrets.toml

[sources.elasticsearch_source] username = "elastic" password = "your_password_here" # or for API key authentication api_key = "id:api_key"

dlt reads this automatically at runtime — never hardcode tokens in your pipeline script. For production environments, see setting up credentials with dlt for environment variable and vault-based options.


How do I set up and run the pipeline?

Set up a virtual environment and install dlt:

uv venv && source .venv/bin/activate uv pip install "dlt[workspace]"

1. Install the dlt AI Workbench:

dlt ai init --agent <your-agent> # <agent>: claude | cursor | codex

This installs project rules, a secrets management skill, appropriate ignore files, and configures the dlt MCP server for your agent. Learn more →

2. Install the rest-api-pipeline toolkit:

dlt ai toolkit rest-api-pipeline install

This loads the skills and context about dlt the agent uses to build the pipeline iteratively, efficiently, and safely. The agent uses MCP tools to inspect credentials — it never needs to read your secrets.toml directly. Learn more →

3. Start LLM-assisted coding:

Use /find-source to load data from the Elasticsearch API into DuckDB.

The rest-api-pipeline toolkit takes over from here — it reads relevant API documentation, presents you with options for which endpoints to load, and follows a structured workflow to scaffold, debug, and validate the pipeline step by step.

4. Run the pipeline:

python elasticsearch_pipeline.py

If everything is configured correctly, you'll see output like this:

Pipeline elasticsearch_pipeline load step completed in 0.26 seconds 1 load package(s) were loaded to destination duckdb and into dataset elasticsearch_data The duckdb destination used duckdb:/elasticsearch.duckdb location to store data Load package 1749667187.541553 is LOADED and contains no failed jobs

Inspect your pipeline and data:

dlt pipeline elasticsearch_pipeline show

This opens the Pipeline Dashboard where you can verify pipeline state, load metrics, schema (tables, columns, types), and query the loaded data directly.


Python pipeline example

This example loads search and cat_indices from the Elasticsearch API into DuckDB. It mirrors the endpoint and data selector configuration from the table above:

import dlt from dlt.sources.rest_api import RESTAPIConfig, rest_api_resources @dlt.source def elasticsearch_source(username=dlt.secrets.value): config: RESTAPIConfig = { "client": { "base_url": "http://localhost:9200", "auth": { "type": "http_basic", "password": username, }, }, "resources": [ {"name": "search", "endpoint": {"path": "_search", "data_selector": "hits.hits"}}, {"name": "cat_indices", "endpoint": {"path": "_cat/indices?format=json"}} ], } yield from rest_api_resources(config) def get_data() -> None: pipeline = dlt.pipeline( pipeline_name="elasticsearch_pipeline", destination="duckdb", dataset_name="elasticsearch_data", ) load_info = pipeline.run(elasticsearch_source()) print(load_info)

To add more endpoints, append entries from the resource table to the "resources" list using the same name, path, and data_selector pattern.


How do I query the loaded data?

Once the pipeline runs, dlt creates one table per resource. You can query with Python or SQL.

Python (pandas DataFrame):

import dlt data = dlt.pipeline("elasticsearch_pipeline").dataset() sessions_df = data.search.df() print(sessions_df.head())

SQL (DuckDB example):

SELECT * FROM elasticsearch_data.search LIMIT 10;

In a marimo or Jupyter notebook:

import dlt data = dlt.pipeline("elasticsearch_pipeline").dataset() data.search.df().head()

See how to explore your data in marimo Notebooks and how to query your data in Python with dataset.


What destinations can I load Elasticsearch data to?

dlt supports loading into any of these destinations — only the destination parameter changes:

DestinationExample value
DuckDB (local, default)"duckdb"
PostgreSQL"postgres"
BigQuery"bigquery"
Snowflake"snowflake"
Redshift"redshift"
Databricks"databricks"
Filesystem (S3, GCS, Azure)"filesystem"

Change the destination in dlt.pipeline(destination="snowflake") and add credentials in .dlt/secrets.toml. See the full destinations list.


Next steps

Continue your data engineering journey with the other toolkits of the dltHub AI Workbench:

  • data-exploration — Build custom notebooks, charts, and dashboards for deeper analysis with marimo notebooks.
  • dlthub-runtime — Deploy, schedule, and monitor your pipeline in production.
dlt ai toolkit data-exploration install dlt ai toolkit dlthub-runtime install

Was this page helpful?

Community Hub

Need more dlt context for Elasticsearch?

Request dlt skills, commands, AGENT.md files, and AI-native context.