dlt._workspace.mcp.tools.data_tools
list_pipelines
@with_mcp_tool_telemetry()
def list_pipelines() -> List[str]
List all dlt pipelines available in this workspace
get_workspace_info
@with_mcp_tool_telemetry()
def get_workspace_info() -> Dict[str, Any]
Get workspace info: name, directories, active profile, and config providers.
list_profiles
@with_mcp_tool_telemetry()
def list_profiles() -> List[Dict[str, Any]]
List all available workspace profiles with status flags (current, pinned, configured).
list_tables
@with_mcp_tool_telemetry()
def list_tables(pipeline_name: str) -> Dict[str, Any]
List all data tables for a pipeline.
get_table_schema
@with_mcp_tool_telemetry()
def get_table_schema(pipeline_name: str, table_name: str) -> Dict[str, Any]
Get table schema with column names, data types, and escaped sql_identifier fields.
get_table_create_sql
@with_mcp_tool_telemetry()
def get_table_create_sql(pipeline_name: str, table_name: str) -> str
Get CREATE TABLE DDL for the table in the destination's SQL dialect.
preview_table
@with_mcp_tool_telemetry()
def preview_table(
pipeline_name: str,
table_name: str,
output_format: Annotated[
TResultFormat,
Field(description="Output format: 'markdown' or 'jsonl'"),
] = "markdown"
) -> str
Get the first 10 rows from a table.
execute_sql_query
@with_mcp_tool_telemetry()
def execute_sql_query(
pipeline_name: str,
sql_select_query: Annotated[
str,
Field(
description="SQL SELECT query to execute (only SELECT is allowed)"
),
],
output_format: Annotated[
TResultFormat,
Field(description="Output format: 'markdown' or 'jsonl'"),
] = "markdown"
) -> str
Execute a read-only SQL query against the pipeline's destination dataset.
get_row_counts
@with_mcp_tool_telemetry()
def get_row_counts(
pipeline_name: str,
output_format: Annotated[
TResultFormat,
Field(description="Output format: 'markdown' or 'jsonl'"),
] = "markdown"
) -> str
Get row counts for all data tables in a pipeline.
export_schema
@with_mcp_tool_telemetry()
def export_schema(
pipeline_name: str,
schema_name: Optional[str] = None,
hide_columns: Annotated[
bool,
Field(description=
"Hide column details for better readability of large schemas"),
] = False,
output_format: Annotated[
TMcpSchemaFormat,
Field(description="Output format: 'mermaid', 'yaml', or 'dbml'"),
] = "mermaid",
save_to_file: Annotated[
Optional[str],
Field(description=(
"Save the schema to this file path instead of returning it."
" Use an absolute path (e.g. /home/user/schema.yaml).")),
] = None
) -> str
Export the pipeline schema as a diagram or structured dump.
get_local_pipeline_state
@with_mcp_tool_telemetry()
def get_local_pipeline_state(pipeline_name: str) -> Dict[str, Any]
Get pipeline state: incremental cursors, resource state, and source state.
pipeline_trace
@with_mcp_tool_telemetry()
def pipeline_trace(pipeline_name: str) -> Dict[str, Any]
Get the trace of the last pipeline run: timing, step outcomes, and errors.