dlt._workspace.helpers.dashboard.utils.queries
SQL query execution, row counts, and load history retrieval.
clear_query_cache
def clear_query_cache() -> None
Clear the cached query results.
update_query_history
def update_query_history(current_history: Dict[str, int], query: str,
row_count: int) -> Dict[str, int]
Update query history dict, placing most recent query first.
get_default_query_for_table
def get_default_query_for_table(pipeline: dlt.Pipeline, schema_name: str,
table_name: str,
limit: bool) -> Tuple[str, str, str]
Build a default SELECT query for a table. Returns (sql_query, error_message, traceback).
get_example_query_for_dataset
def get_example_query_for_dataset(pipeline: dlt.Pipeline,
schema_name: str) -> Tuple[str, str, str]
Return an example query for the first data table in the schema.
Returns (sql_query, error_message, traceback).
get_query_result
def get_query_result(pipeline: dlt.Pipeline,
query: str) -> Tuple[pyarrow.Table, str, str]
Get the result of a query.
Parses the query to ensure it is valid SQL before sending it to the destination.
get_row_counts
def get_row_counts(pipeline: dlt.Pipeline,
selected_schema_name: str = None,
load_id: str = None) -> Dict[str, int]
Get the row counts for a pipeline as a dict of {table_name: count}.
get_row_counts_list
def get_row_counts_list(
pipeline: dlt.Pipeline,
selected_schema_name: str = None,
load_id: str = None) -> List[Dict[str, Union[str, int]]]
Get the row counts for a pipeline as a list of {name, row_count} dicts.
get_loads
def get_loads(c: DashboardConfiguration,
pipeline: dlt.Pipeline,
limit: int = 100
) -> Tuple[List[TLoadItem], Optional[str], Optional[str]]
Get the loads of a pipeline. Returns (loads_list, error_message, traceback).
build_load_details
def build_load_details(pipeline: dlt.Pipeline,
load: TLoadItem) -> List[mo.Html]
Build the load detail widgets: row counts and schema version accordion.