* Certainly need an AIP
* There are two difficult things in Computer Science... You know the drill
(cache invalidation). And yes - caching is hard. Very. Very. Very.
Difficult. Especially in a distributed context, with backfillable Dags,
versioned dags, storing task state etc.
* I would even be stronger and push back on even thinking and discussing
about caching before task state is done available and used.

As Kaxil mentioned, there are many common properties between caching and
task state—but Task State addresses them much more explicitly. and it's
lower level. The "magical" caching is deceptively simple. With task state,
Operator author or task flow task author has to deliberately think on how
task state should be used - avoiding certain traps, where "cache=True"
means that we will have to have all those cases solved for users who will
expect that everything will work as they think and all the edge cases will
be solved for them. I guess we will only start learning when Task State
(which is a lower-level primitive) starts being used. With such lower-level
solutions, edge cases can be solved temporarily by "if x - do it
differently". When we have "cache=True," there are no workarounds.

So my proposal is "yes. But let's build it on AIP-103, and only after we
learn some edge-cases.

J.


On Fri, May 22, 2026 at 11:47 PM Kaxil Naik <[email protected]> wrote:

> Hi Dennis,
>
> Thanks for writing this up -- result caching is something I have been
> thinking about as a following to AIP-103 (and hence listed as follow-up AIP
> in AIP-103 doc
> <
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)
> >)
> in for the same: ML and reproducible-pipeline workloads. A few thoughts on
> shape, scope, and how this fits with what's already in flight.
>
> *Yes, this needs an AIP*
>
> I'd push back on "this doesn't require an AIP". The proposal touches many
> critical paths, especially the scheduler's execution path (skipping
> queue/executor on a cache hit).
>
> That's a public-surface, cross-cutting change with behaviour that's hard to
> walk back once shipped. The AIP doesn't have to be heavy -- but a written
> design that the community can vote on (and that ties down the edge cases
> below) will save us from "we shipped it experimental and now four people
> interpret cache semantics differently". The discussion thread alone won't
> capture the decisions we need to make.
>
> *Relationship to AIP-103 (Task State Management)*
>
> This is the part I think needs the most thought before you start
> implementing.
>
> AIP-103 is already landing a pluggable, scoped, JWT-authorized state
> backend with:
>
>
>    - A `BaseTaskStateBackend` interface with sync + async
>    get/set/delete/clear
>    - A `StateScope` abstraction (`TaskScope`, `AssetScope` today)
>    - Retention/GC policy at deployment + operator level
>    - A UI panel for inspecting/clearing entries
>    - Execution API endpoints under `/state/...`
>
>
> Result caching needs essentially the same infrastructure: a pluggable
> backend, retention (TTL is just a different retention policy), UI for
> inspecting/clearing entries, multi-team isolation, garbage collection. If
> we ship a parallel `BaseTaskCacheBackend` with its own UI, its own config
> section, its own retention story, we end up with two near-identical
> subsystems that diverge over time.
>
> My strong preference: extend AIP-103's scope model with a `CacheScope` (or
> similar) keyed by `(team, dag_id, task_id, source_hash, input_hash)`, and
> reuse the backend abstraction. Result caching then becomes "AIP-103 + a
> scheduler-side lookup + a TTL semantics layer". Hence, caching was already
> listed as a follow-up future AIP in AIP-103 doc
> <
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)
> >
> .
>
> *Specific concerns worth pinning down in the AIP*
>
> A few things I'd want explicitly answered before voting:
>
>    - *Observability on cache hits*. If a TI is marked SUCCESS without a
>    worker run, what does the UI show? `start_date` / `end_date` /
> `duration`?
>    Logs? Does `dag.task.duration` stat get emitted? "Skipped because
> cached"
>    needs to be visually distinct from a real success -- otherwise debugging
>    "why is my pipeline producing stale data" gets very hard. I'd argue for
> a
>    new TI state (`CACHED` or a sub-state) rather than overloading SUCCESS.
>    - *Task source hash definition*. You say "computed from the serialized
>    DAG JSON". The serialized DAG includes `retries`, `pool`, `queue`,
>    `default_args`, owner, etc. -- none of which affect output. If a user
> bumps
>    `retries` from 2 to 3, the cache invalidates. The hash needs a narrower
>    definition: the callable's bytecode/source + `op_args` + `op_kwargs`
> only.
>    Worth specifying exactly what's in the hash.
>    - *XCom backend interaction.* If the XCom backend is S3 and we're
>    caching the XCom value, a cache hit pushes the *same* pointer to the new
>    TI's XCom row. Is that pointer still alive? Do we re-validate on hit?
> Do we
>    copy the underlying object? This matters when the XCom backend has its
> own
>    retention separate from the cache TTL.
>    - *Mapped tasks.* Is the cache key per `map_index`? What happens on
>    partial hits (3 of 10 mapped tasks hit cache, 7 miss)? Does the executor
>    still get N slots reserved? This needs to be in the design, not "we'll
>    figure it out".
>    - *Deferred / async tasks.* Cache the final result, or skip the deferral
>    entirely? What about sensors a user has explicitly opted into caching --
>    that's a foot-gun and I'd consider blocking it outright with a `cache`
>    param on `BaseSensorOperator` that raises if set to `True`, rather than
>    relying on "free" inheritance.
>    - *Determinism assumption.* This is opt-in, so technically user-owned.
>    But a `@task` calling `datetime.utcnow()` or hitting a non-idempotent
> API
>    will silently produce wrong results across runs once cached. Worth a
>    prominent doc warning and maybe a static-analysis lint in `airflow dags
>    reserialize` that flags obviously-non-deterministic callables.
>    - *Scheduler critical-path latency.* 500ms fail-open timeout * N
>    cache-eligible tasks per scheduler loop = a noticeable scheduler stall
> if
>    the backend degrades. The lookup needs to be async/batched, and we
> should
>    be explicit about what happens to scheduling throughput when the cache
>    backend is slow but not failing. Otherwise the "we avoid 100ms
> subprocess
>    overhead" win gets eaten by lookup overhead in the steady state. And can
>    cause worse effect that we want to avoid -- especially if we have a
>    pluggable backend that runs on scheduler -- we end up running user code
>    there.
>    - *Manual invalidation.* "I'm not sure on exactly how that would work"
>    -- this is a hard requirement, not a follow-up. Operators will need to
>    clear the cache for a task/dag after a bug fix, schema change, or
> upstream
>    data correction. Needs a CLI command (`airflow tasks clear-cache ...`)
> and
>    a UI button, both authz'd. Without it, a stale cached result is
>    unrecoverable except by waiting for TTL or changing the source.
>
>
> re prior art: Flyte's design is a reasonable reference but their execution
> model is materially different -- their tasks are containers with declared
> inputs/outputs afaik, so source-hash and input-hash are first-class. We
> have neither, which is why source-hash definition (above) matters so much.
>
> The other thing I have been playing around is caching parts of Task
> Execution and not the entire task results. Not certain yet but playing
> around.
>
> Cheers,
> Kaxil
>
> On Fri, 22 May 2026 at 18:14, Ferruzzi, Dennis <[email protected]>
> wrote:
>
> > Hi all,
> >
> > I've heard a few people discussing the idea and with the
> > Deadlines/Callbacks/Workloads project wrapping up, I'd like to work on
> Task
> > Result Caching as my next project.  I'm looking for feedback before I
> start
> > implementation.
> >
> > The short version: for ML and other repetitive workloads, tasks often run
> > with identical inputs and produce identical outputs across Dag runs.
> > Currently we spawn a subprocess every time regardless.  I'd like to add
> an
> > optional, per-task caching mechanism that lets the scheduler
> short-circuit
> > execution when a valid cached result already exists.  No subprocess
> > spawned, no executor slot consumed.
> >
> > I don't feel like this requires an AIP, but I'm happy to write one if the
> > community feels otherwise.
> >
> > THE BASICS
> >
> > Users opt in per-task:
> >
> >       @task(cache=True)
> >       def train_model(data_path: str): ...
> >
> >       @task(cache=CacheConfig(ttl=3600, exclude_inputs=["account_id"]))
> >       def fetch_data(account_id: str, date_range: str): ...
> >
> >       # Classic operators
> >       PythonOperator(..., cache=True)
> >
> > I propose a new "cache" parameter to BaseOperator:
> >       cache: CacheConfig | bool | None = None
> >
> >       cache=True is sugar for CacheConfig() with all defaults.  Tasks
> > without cache set are completely unaffected; the scheduler skips the
> check
> > entirely.
> >
> >
> > HOW IT WORKS
> >
> >   1. Scheduler is about to queue a cache-enabled task.
> >   2. Computes a cache key from the task's identity and inputs.
> >   3. Checks the cache backend (with a fail-open timeout).
> >   4. Cache hit: mark SUCCESS, push cached result to XCom, done.
> >   5. Cache miss or timeout: execute normally, store result afterward.
> >
> > The cache check happens in the scheduler before spawning the subprocess.
> > This is the whole point; we avoid the 100ms+ (estimated, I don't have
> hard
> > numbers on this) subprocess overhead entirely on cache hits.  If the
> cache
> > backend is slow or unreachable, we timeout and execute normally.
> >
> >
> > CACHE KEY STRUCTURE
> >
> > The cache key is a hash of:
> >
> >   - Always included (system-managed): team_name, dag_id, task_id, task
> > source hash
> >   - Included by default: all kwargs passed to the task
> >   - User-subtractable via exclude_inputs: kwargs irrelevant to the result
> >   - User-addable via include_inputs: context fields like dag_run.conf
> keys
> >
> > The task source hash is computed on the fly from the serialized Dag JSON
> > (already loaded in the scheduler) and calculating it shouldn't take more
> > than a couple microseconds.  If the task's code changes, the hash
> changes,
> > and the cache is invalid.
> >
> >
> > CACHE INVALIDATION
> >
> >   - include_inputs / exclude_inputs determine identity ("Are these the
> > same inputs?")
> >   - TTL determines validity ("Are these results still fresh?")
> >
> > A cache hit requires both.  TTL is configurable with a global default and
> > per-task override.  Manual invalidation may also be available or may be a
> > future addition, I'm not sure on exactly how that would work.
> >
> >
> > STORAGE BACKEND
> >
> > Pluggable interface with a metadata DB default.  Same pattern as XCom
> > backends, secrets backends, etc.  Providers can implement alternatives
> (S3,
> > Redis, dedicated DB).
> >
> >
> > MULTI-TEAM ISOLATION
> >
> > team_name is pulled from the Dag's bundle-to-team association, not
> > user-provided.  It will be baked into the cache key hash and stored as a
> DB
> > column for query filtering.  When multi_team is enabled, no cross-team
> > cache hits are possible.
> >
> >
> > TASK TYPE SUPPORT
> >
> >   - Operators and Transfers: cacheable, opt-in
> >   - Sensors: BaseSensorOperator defaults to cache=False since the point
> of
> > a sensor is to check the world right now, not return a stored answer.
> > Users could explicitly re-enable it on a per-sensor basis if they want to
> > for some reason since BaseSensorOperator inherits BaseOperator, but
> that's
> > a "free" feature and not something extra I intend to put time into
> > supporting or explicitly blocking.
> >   - Waiters and Notifiers: not applicable (don't inherit BaseOperator)
> >
> > ADMIN CONTROLS
> >
> >   - Global kill switch: [core] task_caching_enabled
> >   - Global TTL default in airflow.cfg
> >   - Cache backend class path: [core] task_cache_backend
> >   - Fail-open timeout (default 500ms?)
> >
> > PRIOR ART
> >
> > I could not find any prior discussion of this in the Airflow community
> but
> > I checked Prefect and Flyte; both of them have task caching.  This design
> > is closest to Flyte's (opt-in, source-code-aware, project/domain
> isolation)
> > with stronger multi-team isolation and TTL support.
> >
> >
> > LAUNCH PLAN
> >
> > This would launch as an experimental feature behind a feature flag.
> >
> >
> > OPEN QUESTIONS
> >
> >   - Does this need an AIP?
> >   - Any concerns with adding a "cache" param to BaseOperator?
> >   - Thoughts on the cache key structure (include/exclude approach)?
> >   - Anything I'm missing?
> >
> >   Looking forward to your thoughts.
> >
> >   - ferruzzi
> >
>

Reply via email to