Hi Dennis,

Thanks for writing this up -- result caching is something I have been
thinking about as a following to AIP-103 (and hence listed as follow-up AIP
in AIP-103 doc
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)>)
in for the same: ML and reproducible-pipeline workloads. A few thoughts on
shape, scope, and how this fits with what's already in flight.

*Yes, this needs an AIP*

I'd push back on "this doesn't require an AIP". The proposal touches many
critical paths, especially the scheduler's execution path (skipping
queue/executor on a cache hit).

That's a public-surface, cross-cutting change with behaviour that's hard to
walk back once shipped. The AIP doesn't have to be heavy -- but a written
design that the community can vote on (and that ties down the edge cases
below) will save us from "we shipped it experimental and now four people
interpret cache semantics differently". The discussion thread alone won't
capture the decisions we need to make.

*Relationship to AIP-103 (Task State Management)*

This is the part I think needs the most thought before you start
implementing.

AIP-103 is already landing a pluggable, scoped, JWT-authorized state
backend with:


   - A `BaseTaskStateBackend` interface with sync + async
   get/set/delete/clear
   - A `StateScope` abstraction (`TaskScope`, `AssetScope` today)
   - Retention/GC policy at deployment + operator level
   - A UI panel for inspecting/clearing entries
   - Execution API endpoints under `/state/...`


Result caching needs essentially the same infrastructure: a pluggable
backend, retention (TTL is just a different retention policy), UI for
inspecting/clearing entries, multi-team isolation, garbage collection. If
we ship a parallel `BaseTaskCacheBackend` with its own UI, its own config
section, its own retention story, we end up with two near-identical
subsystems that diverge over time.

My strong preference: extend AIP-103's scope model with a `CacheScope` (or
similar) keyed by `(team, dag_id, task_id, source_hash, input_hash)`, and
reuse the backend abstraction. Result caching then becomes "AIP-103 + a
scheduler-side lookup + a TTL semantics layer". Hence, caching was already
listed as a follow-up future AIP in AIP-103 doc
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)>
.

*Specific concerns worth pinning down in the AIP*

A few things I'd want explicitly answered before voting:

   - *Observability on cache hits*. If a TI is marked SUCCESS without a
   worker run, what does the UI show? `start_date` / `end_date` / `duration`?
   Logs? Does `dag.task.duration` stat get emitted? "Skipped because cached"
   needs to be visually distinct from a real success -- otherwise debugging
   "why is my pipeline producing stale data" gets very hard. I'd argue for a
   new TI state (`CACHED` or a sub-state) rather than overloading SUCCESS.
   - *Task source hash definition*. You say "computed from the serialized
   DAG JSON". The serialized DAG includes `retries`, `pool`, `queue`,
   `default_args`, owner, etc. -- none of which affect output. If a user bumps
   `retries` from 2 to 3, the cache invalidates. The hash needs a narrower
   definition: the callable's bytecode/source + `op_args` + `op_kwargs` only.
   Worth specifying exactly what's in the hash.
   - *XCom backend interaction.* If the XCom backend is S3 and we're
   caching the XCom value, a cache hit pushes the *same* pointer to the new
   TI's XCom row. Is that pointer still alive? Do we re-validate on hit? Do we
   copy the underlying object? This matters when the XCom backend has its own
   retention separate from the cache TTL.
   - *Mapped tasks.* Is the cache key per `map_index`? What happens on
   partial hits (3 of 10 mapped tasks hit cache, 7 miss)? Does the executor
   still get N slots reserved? This needs to be in the design, not "we'll
   figure it out".
   - *Deferred / async tasks.* Cache the final result, or skip the deferral
   entirely? What about sensors a user has explicitly opted into caching --
   that's a foot-gun and I'd consider blocking it outright with a `cache`
   param on `BaseSensorOperator` that raises if set to `True`, rather than
   relying on "free" inheritance.
   - *Determinism assumption.* This is opt-in, so technically user-owned.
   But a `@task` calling `datetime.utcnow()` or hitting a non-idempotent API
   will silently produce wrong results across runs once cached. Worth a
   prominent doc warning and maybe a static-analysis lint in `airflow dags
   reserialize` that flags obviously-non-deterministic callables.
   - *Scheduler critical-path latency.* 500ms fail-open timeout * N
   cache-eligible tasks per scheduler loop = a noticeable scheduler stall if
   the backend degrades. The lookup needs to be async/batched, and we should
   be explicit about what happens to scheduling throughput when the cache
   backend is slow but not failing. Otherwise the "we avoid 100ms subprocess
   overhead" win gets eaten by lookup overhead in the steady state. And can
   cause worse effect that we want to avoid -- especially if we have a
   pluggable backend that runs on scheduler -- we end up running user code
   there.
   - *Manual invalidation.* "I'm not sure on exactly how that would work"
   -- this is a hard requirement, not a follow-up. Operators will need to
   clear the cache for a task/dag after a bug fix, schema change, or upstream
   data correction. Needs a CLI command (`airflow tasks clear-cache ...`) and
   a UI button, both authz'd. Without it, a stale cached result is
   unrecoverable except by waiting for TTL or changing the source.


re prior art: Flyte's design is a reasonable reference but their execution
model is materially different -- their tasks are containers with declared
inputs/outputs afaik, so source-hash and input-hash are first-class. We
have neither, which is why source-hash definition (above) matters so much.

The other thing I have been playing around is caching parts of Task
Execution and not the entire task results. Not certain yet but playing
around.

Cheers,
Kaxil

On Fri, 22 May 2026 at 18:14, Ferruzzi, Dennis <[email protected]> wrote:

> Hi all,
>
> I've heard a few people discussing the idea and with the
> Deadlines/Callbacks/Workloads project wrapping up, I'd like to work on Task
> Result Caching as my next project.  I'm looking for feedback before I start
> implementation.
>
> The short version: for ML and other repetitive workloads, tasks often run
> with identical inputs and produce identical outputs across Dag runs.
> Currently we spawn a subprocess every time regardless.  I'd like to add an
> optional, per-task caching mechanism that lets the scheduler short-circuit
> execution when a valid cached result already exists.  No subprocess
> spawned, no executor slot consumed.
>
> I don't feel like this requires an AIP, but I'm happy to write one if the
> community feels otherwise.
>
> THE BASICS
>
> Users opt in per-task:
>
>       @task(cache=True)
>       def train_model(data_path: str): ...
>
>       @task(cache=CacheConfig(ttl=3600, exclude_inputs=["account_id"]))
>       def fetch_data(account_id: str, date_range: str): ...
>
>       # Classic operators
>       PythonOperator(..., cache=True)
>
> I propose a new "cache" parameter to BaseOperator:
>       cache: CacheConfig | bool | None = None
>
>       cache=True is sugar for CacheConfig() with all defaults.  Tasks
> without cache set are completely unaffected; the scheduler skips the check
> entirely.
>
>
> HOW IT WORKS
>
>   1. Scheduler is about to queue a cache-enabled task.
>   2. Computes a cache key from the task's identity and inputs.
>   3. Checks the cache backend (with a fail-open timeout).
>   4. Cache hit: mark SUCCESS, push cached result to XCom, done.
>   5. Cache miss or timeout: execute normally, store result afterward.
>
> The cache check happens in the scheduler before spawning the subprocess.
> This is the whole point; we avoid the 100ms+ (estimated, I don't have hard
> numbers on this) subprocess overhead entirely on cache hits.  If the cache
> backend is slow or unreachable, we timeout and execute normally.
>
>
> CACHE KEY STRUCTURE
>
> The cache key is a hash of:
>
>   - Always included (system-managed): team_name, dag_id, task_id, task
> source hash
>   - Included by default: all kwargs passed to the task
>   - User-subtractable via exclude_inputs: kwargs irrelevant to the result
>   - User-addable via include_inputs: context fields like dag_run.conf keys
>
> The task source hash is computed on the fly from the serialized Dag JSON
> (already loaded in the scheduler) and calculating it shouldn't take more
> than a couple microseconds.  If the task's code changes, the hash changes,
> and the cache is invalid.
>
>
> CACHE INVALIDATION
>
>   - include_inputs / exclude_inputs determine identity ("Are these the
> same inputs?")
>   - TTL determines validity ("Are these results still fresh?")
>
> A cache hit requires both.  TTL is configurable with a global default and
> per-task override.  Manual invalidation may also be available or may be a
> future addition, I'm not sure on exactly how that would work.
>
>
> STORAGE BACKEND
>
> Pluggable interface with a metadata DB default.  Same pattern as XCom
> backends, secrets backends, etc.  Providers can implement alternatives (S3,
> Redis, dedicated DB).
>
>
> MULTI-TEAM ISOLATION
>
> team_name is pulled from the Dag's bundle-to-team association, not
> user-provided.  It will be baked into the cache key hash and stored as a DB
> column for query filtering.  When multi_team is enabled, no cross-team
> cache hits are possible.
>
>
> TASK TYPE SUPPORT
>
>   - Operators and Transfers: cacheable, opt-in
>   - Sensors: BaseSensorOperator defaults to cache=False since the point of
> a sensor is to check the world right now, not return a stored answer.
> Users could explicitly re-enable it on a per-sensor basis if they want to
> for some reason since BaseSensorOperator inherits BaseOperator, but that's
> a "free" feature and not something extra I intend to put time into
> supporting or explicitly blocking.
>   - Waiters and Notifiers: not applicable (don't inherit BaseOperator)
>
> ADMIN CONTROLS
>
>   - Global kill switch: [core] task_caching_enabled
>   - Global TTL default in airflow.cfg
>   - Cache backend class path: [core] task_cache_backend
>   - Fail-open timeout (default 500ms?)
>
> PRIOR ART
>
> I could not find any prior discussion of this in the Airflow community but
> I checked Prefect and Flyte; both of them have task caching.  This design
> is closest to Flyte's (opt-in, source-code-aware, project/domain isolation)
> with stronger multi-team isolation and TTL support.
>
>
> LAUNCH PLAN
>
> This would launch as an experimental feature behind a feature flag.
>
>
> OPEN QUESTIONS
>
>   - Does this need an AIP?
>   - Any concerns with adding a "cache" param to BaseOperator?
>   - Thoughts on the cache key structure (include/exclude approach)?
>   - Anything I'm missing?
>
>   Looking forward to your thoughts.
>
>   - ferruzzi
>

Reply via email to