+1 on the idea and +1 that we have it as a core feature. With all the details listed below and reasonable things to consider, AIP is needed. But we have many users who would benefir and... as long as opt-in I think it can also start simpler and then grow over time.

Jens

P.S.: Not sure why but the AWS emails always land in my SPAM filter, both private as well as at-work. Not sure what is wrong with AWS email setup but it seems certainly the same level of computer science problems like cache implementations.

On 23.05.26 01:56, Jarek Potiuk wrote:
* Certainly need an AIP
* There are two difficult things in Computer Science... You know the drill
(cache invalidation). And yes - caching is hard. Very. Very. Very.
Difficult. Especially in a distributed context, with backfillable Dags,
versioned dags, storing task state etc.
* I would even be stronger and push back on even thinking and discussing
about caching before task state is done available and used.

As Kaxil mentioned, there are many common properties between caching and
task state—but Task State addresses them much more explicitly. and it's
lower level. The "magical" caching is deceptively simple. With task state,
Operator author or task flow task author has to deliberately think on how
task state should be used - avoiding certain traps, where "cache=True"
means that we will have to have all those cases solved for users who will
expect that everything will work as they think and all the edge cases will
be solved for them. I guess we will only start learning when Task State
(which is a lower-level primitive) starts being used. With such lower-level
solutions, edge cases can be solved temporarily by "if x - do it
differently". When we have "cache=True," there are no workarounds.

So my proposal is "yes. But let's build it on AIP-103, and only after we
learn some edge-cases.

J.


On Fri, May 22, 2026 at 11:47 PM Kaxil Naik <[email protected]> wrote:

Hi Dennis,

Thanks for writing this up -- result caching is something I have been
thinking about as a following to AIP-103 (and hence listed as follow-up AIP
in AIP-103 doc
<
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)
)
in for the same: ML and reproducible-pipeline workloads. A few thoughts on
shape, scope, and how this fits with what's already in flight.

*Yes, this needs an AIP*

I'd push back on "this doesn't require an AIP". The proposal touches many
critical paths, especially the scheduler's execution path (skipping
queue/executor on a cache hit).

That's a public-surface, cross-cutting change with behaviour that's hard to
walk back once shipped. The AIP doesn't have to be heavy -- but a written
design that the community can vote on (and that ties down the edge cases
below) will save us from "we shipped it experimental and now four people
interpret cache semantics differently". The discussion thread alone won't
capture the decisions we need to make.

*Relationship to AIP-103 (Task State Management)*

This is the part I think needs the most thought before you start
implementing.

AIP-103 is already landing a pluggable, scoped, JWT-authorized state
backend with:


    - A `BaseTaskStateBackend` interface with sync + async
    get/set/delete/clear
    - A `StateScope` abstraction (`TaskScope`, `AssetScope` today)
    - Retention/GC policy at deployment + operator level
    - A UI panel for inspecting/clearing entries
    - Execution API endpoints under `/state/...`


Result caching needs essentially the same infrastructure: a pluggable
backend, retention (TTL is just a different retention policy), UI for
inspecting/clearing entries, multi-team isolation, garbage collection. If
we ship a parallel `BaseTaskCacheBackend` with its own UI, its own config
section, its own retention story, we end up with two near-identical
subsystems that diverge over time.

My strong preference: extend AIP-103's scope model with a `CacheScope` (or
similar) keyed by `(team, dag_id, task_id, source_hash, input_hash)`, and
reuse the backend abstraction. Result caching then becomes "AIP-103 + a
scheduler-side lookup + a TTL semantics layer". Hence, caching was already
listed as a follow-up future AIP in AIP-103 doc
<
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)
.

*Specific concerns worth pinning down in the AIP*

A few things I'd want explicitly answered before voting:

    - *Observability on cache hits*. If a TI is marked SUCCESS without a
    worker run, what does the UI show? `start_date` / `end_date` /
`duration`?
    Logs? Does `dag.task.duration` stat get emitted? "Skipped because
cached"
    needs to be visually distinct from a real success -- otherwise debugging
    "why is my pipeline producing stale data" gets very hard. I'd argue for
a
    new TI state (`CACHED` or a sub-state) rather than overloading SUCCESS.
    - *Task source hash definition*. You say "computed from the serialized
    DAG JSON". The serialized DAG includes `retries`, `pool`, `queue`,
    `default_args`, owner, etc. -- none of which affect output. If a user
bumps
    `retries` from 2 to 3, the cache invalidates. The hash needs a narrower
    definition: the callable's bytecode/source + `op_args` + `op_kwargs`
only.
    Worth specifying exactly what's in the hash.
    - *XCom backend interaction.* If the XCom backend is S3 and we're
    caching the XCom value, a cache hit pushes the *same* pointer to the new
    TI's XCom row. Is that pointer still alive? Do we re-validate on hit?
Do we
    copy the underlying object? This matters when the XCom backend has its
own
    retention separate from the cache TTL.
    - *Mapped tasks.* Is the cache key per `map_index`? What happens on
    partial hits (3 of 10 mapped tasks hit cache, 7 miss)? Does the executor
    still get N slots reserved? This needs to be in the design, not "we'll
    figure it out".
    - *Deferred / async tasks.* Cache the final result, or skip the deferral
    entirely? What about sensors a user has explicitly opted into caching --
    that's a foot-gun and I'd consider blocking it outright with a `cache`
    param on `BaseSensorOperator` that raises if set to `True`, rather than
    relying on "free" inheritance.
    - *Determinism assumption.* This is opt-in, so technically user-owned.
    But a `@task` calling `datetime.utcnow()` or hitting a non-idempotent
API
    will silently produce wrong results across runs once cached. Worth a
    prominent doc warning and maybe a static-analysis lint in `airflow dags
    reserialize` that flags obviously-non-deterministic callables.
    - *Scheduler critical-path latency.* 500ms fail-open timeout * N
    cache-eligible tasks per scheduler loop = a noticeable scheduler stall
if
    the backend degrades. The lookup needs to be async/batched, and we
should
    be explicit about what happens to scheduling throughput when the cache
    backend is slow but not failing. Otherwise the "we avoid 100ms
subprocess
    overhead" win gets eaten by lookup overhead in the steady state. And can
    cause worse effect that we want to avoid -- especially if we have a
    pluggable backend that runs on scheduler -- we end up running user code
    there.
    - *Manual invalidation.* "I'm not sure on exactly how that would work"
    -- this is a hard requirement, not a follow-up. Operators will need to
    clear the cache for a task/dag after a bug fix, schema change, or
upstream
    data correction. Needs a CLI command (`airflow tasks clear-cache ...`)
and
    a UI button, both authz'd. Without it, a stale cached result is
    unrecoverable except by waiting for TTL or changing the source.


re prior art: Flyte's design is a reasonable reference but their execution
model is materially different -- their tasks are containers with declared
inputs/outputs afaik, so source-hash and input-hash are first-class. We
have neither, which is why source-hash definition (above) matters so much.

The other thing I have been playing around is caching parts of Task
Execution and not the entire task results. Not certain yet but playing
around.

Cheers,
Kaxil

On Fri, 22 May 2026 at 18:14, Ferruzzi, Dennis <[email protected]>
wrote:

Hi all,

I've heard a few people discussing the idea and with the
Deadlines/Callbacks/Workloads project wrapping up, I'd like to work on
Task
Result Caching as my next project.  I'm looking for feedback before I
start
implementation.

The short version: for ML and other repetitive workloads, tasks often run
with identical inputs and produce identical outputs across Dag runs.
Currently we spawn a subprocess every time regardless.  I'd like to add
an
optional, per-task caching mechanism that lets the scheduler
short-circuit
execution when a valid cached result already exists.  No subprocess
spawned, no executor slot consumed.

I don't feel like this requires an AIP, but I'm happy to write one if the
community feels otherwise.

THE BASICS

Users opt in per-task:

       @task(cache=True)
       def train_model(data_path: str): ...

       @task(cache=CacheConfig(ttl=3600, exclude_inputs=["account_id"]))
       def fetch_data(account_id: str, date_range: str): ...

       # Classic operators
       PythonOperator(..., cache=True)

I propose a new "cache" parameter to BaseOperator:
       cache: CacheConfig | bool | None = None

       cache=True is sugar for CacheConfig() with all defaults.  Tasks
without cache set are completely unaffected; the scheduler skips the
check
entirely.


HOW IT WORKS

   1. Scheduler is about to queue a cache-enabled task.
   2. Computes a cache key from the task's identity and inputs.
   3. Checks the cache backend (with a fail-open timeout).
   4. Cache hit: mark SUCCESS, push cached result to XCom, done.
   5. Cache miss or timeout: execute normally, store result afterward.

The cache check happens in the scheduler before spawning the subprocess.
This is the whole point; we avoid the 100ms+ (estimated, I don't have
hard
numbers on this) subprocess overhead entirely on cache hits.  If the
cache
backend is slow or unreachable, we timeout and execute normally.


CACHE KEY STRUCTURE

The cache key is a hash of:

   - Always included (system-managed): team_name, dag_id, task_id, task
source hash
   - Included by default: all kwargs passed to the task
   - User-subtractable via exclude_inputs: kwargs irrelevant to the result
   - User-addable via include_inputs: context fields like dag_run.conf
keys
The task source hash is computed on the fly from the serialized Dag JSON
(already loaded in the scheduler) and calculating it shouldn't take more
than a couple microseconds.  If the task's code changes, the hash
changes,
and the cache is invalid.


CACHE INVALIDATION

   - include_inputs / exclude_inputs determine identity ("Are these the
same inputs?")
   - TTL determines validity ("Are these results still fresh?")

A cache hit requires both.  TTL is configurable with a global default and
per-task override.  Manual invalidation may also be available or may be a
future addition, I'm not sure on exactly how that would work.


STORAGE BACKEND

Pluggable interface with a metadata DB default.  Same pattern as XCom
backends, secrets backends, etc.  Providers can implement alternatives
(S3,
Redis, dedicated DB).


MULTI-TEAM ISOLATION

team_name is pulled from the Dag's bundle-to-team association, not
user-provided.  It will be baked into the cache key hash and stored as a
DB
column for query filtering.  When multi_team is enabled, no cross-team
cache hits are possible.


TASK TYPE SUPPORT

   - Operators and Transfers: cacheable, opt-in
   - Sensors: BaseSensorOperator defaults to cache=False since the point
of
a sensor is to check the world right now, not return a stored answer.
Users could explicitly re-enable it on a per-sensor basis if they want to
for some reason since BaseSensorOperator inherits BaseOperator, but
that's
a "free" feature and not something extra I intend to put time into
supporting or explicitly blocking.
   - Waiters and Notifiers: not applicable (don't inherit BaseOperator)

ADMIN CONTROLS

   - Global kill switch: [core] task_caching_enabled
   - Global TTL default in airflow.cfg
   - Cache backend class path: [core] task_cache_backend
   - Fail-open timeout (default 500ms?)

PRIOR ART

I could not find any prior discussion of this in the Airflow community
but
I checked Prefect and Flyte; both of them have task caching.  This design
is closest to Flyte's (opt-in, source-code-aware, project/domain
isolation)
with stronger multi-team isolation and TTL support.


LAUNCH PLAN

This would launch as an experimental feature behind a feature flag.


OPEN QUESTIONS

   - Does this need an AIP?
   - Any concerns with adding a "cache" param to BaseOperator?
   - Thoughts on the cache key structure (include/exclude approach)?
   - Anything I'm missing?

   Looking forward to your thoughts.

   - ferruzzi


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to