Skip to main content
Version: devel View Markdown

Triggers and scheduling

A trigger declares when a job runs. Triggers are attached to a decorated job via the trigger= argument and are the source of truth for scheduling on the dltHub platform — there is no separate CLI for adding or removing schedules. Change the decorator, redeploy.

from dlt.hub import run
from dlt.hub.run import trigger

@run.pipeline("github_pipeline", trigger=trigger.every("5m"))
def load_commits():
...

This page covers all the trigger types and the related scheduling features.

Basic triggers

TriggerMeaning
trigger.every("5m")Recurring interval ("5m", "6h", seconds as float)
trigger.schedule("0 * * * *")Cron expression
trigger.once("2026-12-31T23:59:59Z")One-shot at a timestamp
"*/5 * * * *"Bare cron string — auto-detected
upstream_job.successFollow-up — fires when an upstream job completes successfully
upstream_job.failFollow-up — fires when an upstream job fails
upstream_job.completedFollow-up — fires on success or failure

Multiple triggers

A job can have any number of triggers. Pass a list and inspect run_context["trigger"] to discover which one fired:

from dlt.hub.run import TJobRunContext

@run.job(
trigger=[
trigger.schedule("0 * * * *"),
upstream_ingest.success,
],
)
def transform(run_context: TJobRunContext):
if run_context["trigger"] == "schedule":
...
elif run_context["trigger"] == "followup":
...

TJobRunContext is a dict injected by the launcher with: run_id, trigger, refresh, and the scheduler-supplied interval_start / interval_end (see Scheduler-driven intervals below).

Follow-up triggers

Every decorated job exposes .success, .fail, and .completed trigger properties. Use them to chain jobs into a dependency graph.

from dlt.hub.run import TJobRunContext

@run.pipeline("transform_pipeline", trigger=ingest_job.success)
def transform(run_context: TJobRunContext):
...

Follow-up triggers fire as soon as the upstream completes — no polling, no scheduler delay.

Scheduler-driven intervals

For incremental pipelines, declare the overall time range with interval= and let the dltHub platform hand each run a [interval_start, interval_end] window:

@run.pipeline(
my_pipeline,
interval={"start": "2026-01-01T00:00:00Z"},
trigger=trigger.schedule("*/3 * * * *"),
)
def daily_ingest(run_context: TJobRunContext):
start = run_context["interval_start"]
end = run_context["interval_end"]
# pass start/end into your source so it is a pure function of inputs
...

Behaviour:

  • Each run gets the cron tick that just elapsed
  • Missed ticks are backfilled automatically — windows extend back continuously
  • On refresh, the dltHub platform resets the interval pointer to interval.start
  • Source code stays stateless — no cursor persistence, no state lookups

Freshness checks

freshness=[upstream.is_fresh] blocks a job until the upstream's most recent interval has fully completed:

@run.pipeline(
"report_pipeline",
trigger=trigger.schedule("0 * * * *"),
freshness=[ingest_job.is_fresh],
)
def build_report(run_context: TJobRunContext):
...

Unlike a trigger, the job still runs on its own schedule — it just skips while upstream is mid-load. Use for transforms that must not observe partial data.

Refresh cascade

A backfill job with refresh="always" originates a refresh signal that propagates through all downstream jobs in the dependency graph. Downstream jobs receive run_context["refresh"] = True and react accordingly (for example pipeline.refresh = "drop_sources").

Refresh policies:

PolicyBehaviour
"always"Originate a refresh signal on every run
"auto"Pass through any refresh signal received from upstream (default)
"block"Stop refresh propagation here
@run.job(expose={"tags": ["backfill"]}, refresh="always")
def backfill():
"""Cascade a refresh; does not load data."""

Then trigger it from the CLI:

dlthub job trigger "tag:backfill"
dlthub run backfill --refresh # explicit refresh on a single job

Tags and bulk triggering

Tags are labels on jobs (set via expose={"tags": [...]}). They are used to:

  1. Group related jobs in the dashboard
  2. Run bulk operations from the CLI via selectors
# trigger every job tagged "ingest"
dlthub job trigger "tag:ingest"

# trigger every job that has a schedule
dlthub job trigger "schedule:*"

# preview without running
dlthub job trigger "tag:ingest" --dry-run

Timezone

Cron expressions default to UTC. To interpret them in a specific IANA timezone, declare it on the job:

@run.pipeline(
my_pipeline,
trigger=trigger.schedule("0 9 * * *"), # 9am
require={"timezone": "Europe/Berlin"}, # ...in Berlin time
)
def morning_load():
...

Intervals in run_context remain UTC datetimes, but they align to tick boundaries in the declared timezone.

Next steps

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.