dlt._workspace.deployment.interval
Interval computation and run-based upstream freshness checks.
next_scheduled_run
def next_scheduled_run(
trigger: TTrigger,
now_reference: datetime,
tz: str = "UTC",
prev_scheduled_run: Optional[datetime] = None) -> datetime
Compute the next scheduled run for a timed trigger.
Returns the UTC datetime when the job should next run.
Arguments:
trigger- Aschedule:,every:, oronce:trigger.now_reference- UTC reference time.tz- IANA timezone for cron evaluation.prev_scheduled_run- When the previous run was scheduled (forevery:).
Raises:
InvalidTrigger- If trigger is not a timed type.
compute_run_interval
def compute_run_interval(trigger: TTrigger,
now: datetime,
prev_interval_end: Optional[datetime],
tz: str = "UTC") -> TTimeInterval
Half-open [start, end) interval for a non-interval job run.
schedule: and every: triggers carry continuity: prev_interval_end
extends start backward to fill gaps (missed ticks, refresh cascade).
All other trigger types return a point-in-time interval regardless of
prev_interval_end — they model one-shot / event dispatches whose
work-window has no meaningful "since last run" semantic.
schedule:<cron>→ most recently ELAPSED cron interval:[cron_prev(cron_floor(now)), cron_floor(now)).prev_interval_end(if set) overridesstart. In steady state those match.every:<period>→[prev_interval_end, now)if set, else[now - period, now).once:<datetime>→ always[once, once).manual:/http:/webhook:/tag:/deployment:/job.success:/job.fail:/pipeline_name:→ always[now, now).
Arguments:
trigger- Any normalized trigger string.now- Reference upper bound (typically the dispatch /started_attime).prev_interval_end- Last successful work-window end, orNone. Only applies toschedule:andevery:triggers.tz- IANA timezone for cron evaluation. Used only forschedule:.
Returns:
TTimeInterval- Half-open[start, end)tuple, both UTC stdlibdatetime.
Raises:
InvalidTrigger- Iftriggercannot be parsed.
resolve_interval_spec
def resolve_interval_spec(spec: TIntervalSpec,
cron_expr: str,
tz: str = "UTC") -> TTimeInterval
Resolve a TIntervalSpec into a concrete TTimeInterval in UTC.
start is required and snapped backward to the latest cron tick <= start.
end defaults to now, also snapped backward. Cron ticks are evaluated in
tz so DST-sensitive expressions work correctly; the returned datetimes
are always UTC.
cron_floor
def cron_floor(cron_expr: str, dt: datetime) -> datetime
Latest cron tick <= dt, preserving dt's timezone.
Iterates cron in naive local time (stripping dt's tzinfo internally) to
get clean wall-clock semantics across DST transitions, then re-attaches
dt.tzinfo to the result.
is_cron_expression
def is_cron_expression(s: str) -> bool
Check if string is a valid cron expression.
check_all_upstream_run_fresh
def check_all_upstream_run_fresh(
freshness_constraints: List[TFreshnessConstraint],
all_jobs: Dict[str, TJobDefinition],
prev_interval_ends: Dict[str, Optional[datetime]],
now_utc: Optional[datetime] = None) -> Tuple[bool, List[str]]
Check run-based freshness constraints against each upstream's last-completed
interval_end.
prev_interval_ends[upstream_ref] is the interval_end of the upstream's last
successful run (None if it has never completed or its freshness state was
reset). Dispatches per-upstream based on the upstream's default_trigger type:
schedule:withoutinterval→interval_endcovers the most recently elapsed cron tickevery:→interval_endis within the previous period- event/manual →
interval_endis set (any completion is enough)