Skip to main content
Version: devel View Markdown

Export Logfire Telemetry

info

The source code for this example can be found in our repository at: https://github.com/dlt-hub/dlt/tree/devel/docs/examples/logfire_telemetry_export

About this Example

Pydantic Logfire is an observability platform from agents and LLM created by the team behind Pydantic.

To enable analytics, reporting, and evaluations, this telemetry (traces, metrics, etc.) needs to be exported to the data lakehouse / warehouse. Pydantic Logfire provides convenient methods to export data via a Python SDK or HTTP requests (docs)

To avoid download the entire history of data on each export (telemetry quickly grows in size), users need incremental loading. This requires a stateful mechanism to track previously loaded data and adjust future exports to fetch only new records.

The dlt library makes this trivial. In this example:

  • Use dlt.sources.incremental() to do stateful, cursor-based extraction on a date column
  • Yield Arrow tables directly from an async resource for efficient columnar ingestion
  • Authenticate with the Logfire Read API using a secret token

NOTE. You will need a Pydantic Logfire read token. It can be obtained via the CLI or UI (guide)

Full source code

from datetime import datetime
from zoneinfo import ZoneInfo

import dlt
from logfire.query_client import AsyncLogfireQueryClient


@dlt.resource
async def metrics(
read_token=dlt.secrets.value,
batch_size: int = 1000,
min_timestamp=dlt.sources.incremental(
"created_at",
initial_value=datetime(1970, 1, 1, tzinfo=ZoneInfo("UTC")),
),
):
"""Fetches rows from the Logfire `metrics` table incrementally.

`metrics` contains pre-aggregated numerical data. It is more efficient than
querying `records` for time-series aggregations but has no dedicated Logfire UI.

Args:
read_token (str): Logfire read API token, loaded from `secrets.toml`.
batch_size (int): Maximum number of rows returned per query.
min_timestamp: Incremental cursor on `created_at`; only rows newer than
the last pipeline run are fetched.

Yields:
pyarrow.Table: A batch of metric rows as an Arrow table.
"""
async with AsyncLogfireQueryClient(read_token=read_token) as client:
yield client.query_arrow(
sql=f"SELECT * FROM metrics LIMIT {batch_size}",
min_timestamp=min_timestamp.start_value,
)


@dlt.resource
async def records(
read_token=dlt.secrets.value,
batch_size: int = 1000,
min_timestamp=dlt.sources.incremental(
"created_at",
initial_value=datetime(1970, 1, 1, tzinfo=ZoneInfo("UTC")),
),
):
"""Fetches rows from the Logfire `records` table incrementally.

Each row is a span or a log (a span with no duration). Spans sharing the same
`trace_id` form a trace, structured as a tree. This is the primary table shown
in the Logfire Live View and the one most queries should target.

Note: `records` excludes pending spans. The full table including pending spans
is `records_all`, but it is not needed for most use cases.

Args:
read_token (str): Logfire read API token, loaded from `secrets.toml`.
batch_size (int): Maximum number of rows returned per query.
min_timestamp: Incremental cursor on `created_at`; only rows newer than
the last pipeline run are fetched.

Yields:
pyarrow.Table: A batch of span/log rows as an Arrow table.
"""
async with AsyncLogfireQueryClient(read_token=read_token) as client:
yield client.query_arrow(
sql=f"SELECT * FROM metrics LIMIT {batch_size}",
min_timestamp=min_timestamp.start_value,
)


@dlt.source
def logfire_source():
"""Returns all Logfire resources (metrics and records) as a dlt source."""
return [metrics, records]


if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="logfire",
destination="duckdb",
)
pipeline.run(logfire_source)

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.