dlt.helpers.dashboard.utils
resolve_dashboard_config
def resolve_dashboard_config(p: dlt.Pipeline) -> DashboardConfiguration
Resolve the dashboard configuration
get_trace_file_path
def get_trace_file_path(pipeline_name: str, pipelines_dir: str) -> Path
Get the path to the pickle file for a pipeline
get_pipeline_last_run
def get_pipeline_last_run(pipeline_name: str, pipelines_dir: str) -> float
Get the last run of a pipeline
get_local_pipelines
def get_local_pipelines(
        pipelines_dir: str = None,
        sort_by_trace: bool = True,
        addtional_pipelines: List[str] = None
) -> Tuple[str, List[Dict[str, Any]]]
Get the local pipelines directory and the list of pipeline names in it.
Arguments:
- pipelines_dirstr, optional - The local pipelines directory. Defaults to get_dlt_pipelines_dir().
- sort_by_tracebool, optional - Whether to sort the pipelines by the latet timestamp of trace. Defaults to True.
Returns:
Tuple[str, List[str]]: The local pipelines directory and the list of pipeline names in it.
get_pipeline
def get_pipeline(pipeline_name: str, pipelines_dir: str) -> dlt.Pipeline
Get a pipeline by name.
Arguments:
- pipeline_namestr - The name of the pipeline to get.
Returns:
- dlt.Pipeline- The pipeline.
get_destination_config
def get_destination_config(
        pipeline: dlt.Pipeline) -> DestinationClientConfiguration
Get the destination config of a pipeline.
schemas_to_table_items
def schemas_to_table_items(schemas: Iterable[Schema],
                           default_schema_name: str) -> List[Dict[str, Any]]
Convert a list of schemas to a list of table items.
pipeline_details
def pipeline_details(c: DashboardConfiguration, pipeline: dlt.Pipeline,
                     pipelines_dir: str) -> List[Dict[str, Any]]
Get the details of a pipeline.
remote_state_details
def remote_state_details(pipeline: dlt.Pipeline) -> List[Dict[str, Any]]
Get the remote state details of a pipeline.
create_table_list
def create_table_list(c: DashboardConfiguration,
                      pipeline: dlt.Pipeline,
                      selected_schema_name: str = None,
                      show_internals: bool = False,
                      show_child_tables: bool = True,
                      show_row_counts: bool = False) -> List[Dict[str, Any]]
Create a list of tables for the pipeline.
Arguments:
- pipeline_namestr - The name of the pipeline to create the table list for.
create_column_list
def create_column_list(
        c: DashboardConfiguration,
        pipeline: dlt.Pipeline,
        table_name: str,
        selected_schema_name: str = None,
        show_internals: bool = False,
        show_type_hints: bool = True,
        show_other_hints: bool = False,
        show_custom_hints: bool = False) -> List[Dict[str, Any]]
Create a list of columns for a table.
Arguments:
- pipeline_namestr - The name of the pipeline to create the column list for.
- table_namestr - The name of the table to create the column list for.
clear_query_cache
def clear_query_cache(pipeline: dlt.Pipeline) -> None
Clear the query cache and history
get_query_result
def get_query_result(pipeline: dlt.Pipeline,
                     query: str) -> Tuple[pd.DataFrame, str, str]
Get the result of a query. Parses the query to ensure it is a valid SQL query before sending it to the destination.
get_row_counts
def get_row_counts(pipeline: dlt.Pipeline,
                   selected_schema_name: str = None,
                   load_id: str = None) -> Dict[str, Any]
Get the row counts for a pipeline.
Arguments:
- pipelinedlt.Pipeline - The pipeline to get the row counts for.
- load_idstr - The load id to get the row counts for.
get_row_counts_list
def get_row_counts_list(pipeline: dlt.Pipeline,
                        selected_schema_name: str = None,
                        load_id: str = None) -> List[Dict[str, Any]]
Get the row counts for a pipeline as a list.
get_loads
def get_loads(c: DashboardConfiguration,
              pipeline: dlt.Pipeline,
              limit: int = 100) -> Tuple[Any, str, str]
Get the loads of a pipeline.
get_schema_by_version
@functools.cache
def get_schema_by_version(pipeline: dlt.Pipeline, version_hash: str) -> Schema
Get the schema version of a pipeline.
trace_overview
def trace_overview(c: DashboardConfiguration,
                   trace: PipelineTrace) -> List[Dict[str, Any]]
Get the overview of a trace.
trace_execution_context
def trace_execution_context(c: DashboardConfiguration,
                            trace: PipelineTrace) -> List[Dict[str, Any]]
Get the execution context of a trace.
trace_steps_overview
def trace_steps_overview(c: DashboardConfiguration,
                         trace: PipelineTrace) -> List[Dict[str, Any]]
Get the steps overview of a trace.
trace_resolved_config_values
def trace_resolved_config_values(c: DashboardConfiguration,
                                 trace: PipelineTrace) -> List[Dict[str, Any]]
Get the resolved config values of a trace.
trace_step_details
def trace_step_details(c: DashboardConfiguration, trace: PipelineTrace,
                       step_id: str) -> List[Any]
Get the details of a step.
style_cell
def style_cell(row_id: str, name: str, __: Any) -> Dict[str, str]
Style a cell in a table.
Arguments:
- row_idstr - The id of the row.
- namestr - The name of the column.
- __Any - The value of the cell.
Returns:
Dict[str, str]: The css style of the cell.
open_local_folder
def open_local_folder(folder: str) -> None
Open a folder in the file explorer
get_local_data_path
def get_local_data_path(pipeline: dlt.Pipeline) -> str
Get the local data path of a pipeline
build_pipeline_link_list
def build_pipeline_link_list(config: DashboardConfiguration,
                             pipelines: List[Dict[str, Any]]) -> str
Build a list of links to the pipeline.
sanitize_trace_for_display
def sanitize_trace_for_display(trace: PipelineTrace) -> Dict[str, Any]
Sanitize a trace for display by cleaning up non-primitive keys (we use tuples as keys in nested hints)
build_exception_section
def build_exception_section(p: dlt.Pipeline) -> List[Any]
Build an exception section for a pipeline