dlt._workspace.deployment._interval_store_freshness
Interval-store and matching-intervals freshness checks.
sort_and_coalesce
def sort_and_coalesce(
intervals: Sequence[TTimeInterval]) -> List[TTimeInterval]
Sort intervals by start and merge adjacent/overlapping into contiguous ranges.
iter_intervals
def iter_intervals(cron_expr: str,
overall: TTimeInterval,
tz: str = "UTC") -> Iterator[TTimeInterval]
Yield discrete [tick_n, tick_n+1) intervals from cron within overall range.
Cron is evaluated in tz (so DST-sensitive expressions tick on local walls),
but yielded datetimes are always UTC. overall may be in any timezone;
it is converted to tz for iteration.
get_eligible_intervals
def get_eligible_intervals(cron_expr: str,
overall: TTimeInterval,
completed: Sequence[TTimeInterval],
tz: str = "UTC") -> List[TTimeInterval]
Return all incomplete intervals within overall, earliest first.
Arguments:
cron_expr- Cron expression defining interval boundaries.overall- The full time range to consider.completed- Sorted, coalesced list of completed intervals.tz- IANA timezone for cron evaluation. Yielded intervals are UTC.
next_eligible_interval
def next_eligible_interval(cron_expr: str,
overall: TTimeInterval,
completed: Sequence[TTimeInterval],
tz: str = "UTC") -> Optional[TTimeInterval]
First incomplete interval, or None if all done.
Arguments:
cron_expr- Cron expression defining interval boundaries.overall- The full time range to consider.completed- Sorted, coalesced list of completed intervals.tz- IANA timezone for cron evaluation. Returned interval is UTC.
TIntervalFreshnessCheck Objects
class TIntervalFreshnessCheck(NamedTuple)
A resolved interval completion query for freshness checking.
effective_interval
The interval to check completion for.
reason_if_not_completed
Reason string to use if the interval is not completed.
resolve_interval_freshness_checks
def resolve_interval_freshness_checks(
downstream_interval: TTimeInterval, downstream_overall: TTimeInterval,
freshness_constraints: List[TFreshnessConstraint],
all_jobs: Dict[str, TJobDefinition]
) -> Tuple[List[TIntervalFreshnessCheck], List[str]]
Resolve interval freshness constraints into completion queries.
Returns:
checks- Completion queries to run against the interval store.immediate_reasons- Errors that don't need a store lookup.
check_all_upstream_interval_fresh
def check_all_upstream_interval_fresh(
checks: List[TIntervalFreshnessCheck],
completions: Dict[Tuple[str, TTimeInterval], bool],
immediate_reasons: Optional[List[str]] = None
) -> Tuple[bool, List[str]]
Evaluate interval freshness given pre-fetched completion data.
Arguments:
checks- Fromresolve_interval_freshness_checks.completions- Maps(upstream_ref, effective_interval)tois_completed.immediate_reasons- Pre-computed error reasons (passed through).
resolve_run_freshness_refs
def resolve_run_freshness_refs(
freshness_constraints: List[TFreshnessConstraint],
all_jobs: Dict[str, TJobDefinition]) -> Tuple[List[str], List[str]]
Resolve which upstream refs need last-run data for run-based freshness.
Returns (refs_to_query, immediate_reasons). immediate_reasons are errors that don't need a run lookup (interactive upstream, wrong constraint type).