Hi Dennis,
Thanks for writing this up -- result caching is something I have been
thinking about as a following to AIP-103 (and hence listed as follow-up AIP
in AIP-103 doc
<
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)
)
in for the same: ML and reproducible-pipeline workloads. A few thoughts on
shape, scope, and how this fits with what's already in flight.
*Yes, this needs an AIP*
I'd push back on "this doesn't require an AIP". The proposal touches many
critical paths, especially the scheduler's execution path (skipping
queue/executor on a cache hit).
That's a public-surface, cross-cutting change with behaviour that's hard to
walk back once shipped. The AIP doesn't have to be heavy -- but a written
design that the community can vote on (and that ties down the edge cases
below) will save us from "we shipped it experimental and now four people
interpret cache semantics differently". The discussion thread alone won't
capture the decisions we need to make.
*Relationship to AIP-103 (Task State Management)*
This is the part I think needs the most thought before you start
implementing.
AIP-103 is already landing a pluggable, scoped, JWT-authorized state
backend with:
- A `BaseTaskStateBackend` interface with sync + async
get/set/delete/clear
- A `StateScope` abstraction (`TaskScope`, `AssetScope` today)
- Retention/GC policy at deployment + operator level
- A UI panel for inspecting/clearing entries
- Execution API endpoints under `/state/...`
Result caching needs essentially the same infrastructure: a pluggable
backend, retention (TTL is just a different retention policy), UI for
inspecting/clearing entries, multi-team isolation, garbage collection. If
we ship a parallel `BaseTaskCacheBackend` with its own UI, its own config
section, its own retention story, we end up with two near-identical
subsystems that diverge over time.
My strong preference: extend AIP-103's scope model with a `CacheScope` (or
similar) keyed by `(team, dag_id, task_id, source_hash, input_hash)`, and
reuse the backend abstraction. Result caching then becomes "AIP-103 + a
scheduler-side lookup + a TTL semantics layer". Hence, caching was already
listed as a follow-up future AIP in AIP-103 doc
<
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-103%3A+Task+State+Management#:~:text=AIP%2Dxx%20(Task%20Result%20Caching)
.
*Specific concerns worth pinning down in the AIP*
A few things I'd want explicitly answered before voting:
- *Observability on cache hits*. If a TI is marked SUCCESS without a
worker run, what does the UI show? `start_date` / `end_date` /
`duration`?
Logs? Does `dag.task.duration` stat get emitted? "Skipped because
cached"
needs to be visually distinct from a real success -- otherwise debugging
"why is my pipeline producing stale data" gets very hard. I'd argue for
a
new TI state (`CACHED` or a sub-state) rather than overloading SUCCESS.
- *Task source hash definition*. You say "computed from the serialized
DAG JSON". The serialized DAG includes `retries`, `pool`, `queue`,
`default_args`, owner, etc. -- none of which affect output. If a user
bumps
`retries` from 2 to 3, the cache invalidates. The hash needs a narrower
definition: the callable's bytecode/source + `op_args` + `op_kwargs`
only.
Worth specifying exactly what's in the hash.
- *XCom backend interaction.* If the XCom backend is S3 and we're
caching the XCom value, a cache hit pushes the *same* pointer to the new
TI's XCom row. Is that pointer still alive? Do we re-validate on hit?
Do we
copy the underlying object? This matters when the XCom backend has its
own
retention separate from the cache TTL.
- *Mapped tasks.* Is the cache key per `map_index`? What happens on
partial hits (3 of 10 mapped tasks hit cache, 7 miss)? Does the executor
still get N slots reserved? This needs to be in the design, not "we'll
figure it out".
- *Deferred / async tasks.* Cache the final result, or skip the deferral
entirely? What about sensors a user has explicitly opted into caching --
that's a foot-gun and I'd consider blocking it outright with a `cache`
param on `BaseSensorOperator` that raises if set to `True`, rather than
relying on "free" inheritance.
- *Determinism assumption.* This is opt-in, so technically user-owned.
But a `@task` calling `datetime.utcnow()` or hitting a non-idempotent
API
will silently produce wrong results across runs once cached. Worth a
prominent doc warning and maybe a static-analysis lint in `airflow dags
reserialize` that flags obviously-non-deterministic callables.
- *Scheduler critical-path latency.* 500ms fail-open timeout * N
cache-eligible tasks per scheduler loop = a noticeable scheduler stall
if
the backend degrades. The lookup needs to be async/batched, and we
should
be explicit about what happens to scheduling throughput when the cache
backend is slow but not failing. Otherwise the "we avoid 100ms
subprocess
overhead" win gets eaten by lookup overhead in the steady state. And can
cause worse effect that we want to avoid -- especially if we have a
pluggable backend that runs on scheduler -- we end up running user code
there.
- *Manual invalidation.* "I'm not sure on exactly how that would work"
-- this is a hard requirement, not a follow-up. Operators will need to
clear the cache for a task/dag after a bug fix, schema change, or
upstream
data correction. Needs a CLI command (`airflow tasks clear-cache ...`)
and
a UI button, both authz'd. Without it, a stale cached result is
unrecoverable except by waiting for TTL or changing the source.
re prior art: Flyte's design is a reasonable reference but their execution
model is materially different -- their tasks are containers with declared
inputs/outputs afaik, so source-hash and input-hash are first-class. We
have neither, which is why source-hash definition (above) matters so much.
The other thing I have been playing around is caching parts of Task
Execution and not the entire task results. Not certain yet but playing
around.
Cheers,
Kaxil
On Fri, 22 May 2026 at 18:14, Ferruzzi, Dennis <[email protected]>
wrote:
Hi all,
I've heard a few people discussing the idea and with the
Deadlines/Callbacks/Workloads project wrapping up, I'd like to work on
Task
Result Caching as my next project. I'm looking for feedback before I
start
implementation.
The short version: for ML and other repetitive workloads, tasks often run
with identical inputs and produce identical outputs across Dag runs.
Currently we spawn a subprocess every time regardless. I'd like to add
an
optional, per-task caching mechanism that lets the scheduler
short-circuit
execution when a valid cached result already exists. No subprocess
spawned, no executor slot consumed.
I don't feel like this requires an AIP, but I'm happy to write one if the
community feels otherwise.
THE BASICS
Users opt in per-task:
@task(cache=True)
def train_model(data_path: str): ...
@task(cache=CacheConfig(ttl=3600, exclude_inputs=["account_id"]))
def fetch_data(account_id: str, date_range: str): ...
# Classic operators
PythonOperator(..., cache=True)
I propose a new "cache" parameter to BaseOperator:
cache: CacheConfig | bool | None = None
cache=True is sugar for CacheConfig() with all defaults. Tasks
without cache set are completely unaffected; the scheduler skips the
check
entirely.
HOW IT WORKS
1. Scheduler is about to queue a cache-enabled task.
2. Computes a cache key from the task's identity and inputs.
3. Checks the cache backend (with a fail-open timeout).
4. Cache hit: mark SUCCESS, push cached result to XCom, done.
5. Cache miss or timeout: execute normally, store result afterward.
The cache check happens in the scheduler before spawning the subprocess.
This is the whole point; we avoid the 100ms+ (estimated, I don't have
hard
numbers on this) subprocess overhead entirely on cache hits. If the
cache
backend is slow or unreachable, we timeout and execute normally.
CACHE KEY STRUCTURE
The cache key is a hash of:
- Always included (system-managed): team_name, dag_id, task_id, task
source hash
- Included by default: all kwargs passed to the task
- User-subtractable via exclude_inputs: kwargs irrelevant to the result
- User-addable via include_inputs: context fields like dag_run.conf
keys
The task source hash is computed on the fly from the serialized Dag JSON
(already loaded in the scheduler) and calculating it shouldn't take more
than a couple microseconds. If the task's code changes, the hash
changes,
and the cache is invalid.
CACHE INVALIDATION
- include_inputs / exclude_inputs determine identity ("Are these the
same inputs?")
- TTL determines validity ("Are these results still fresh?")
A cache hit requires both. TTL is configurable with a global default and
per-task override. Manual invalidation may also be available or may be a
future addition, I'm not sure on exactly how that would work.
STORAGE BACKEND
Pluggable interface with a metadata DB default. Same pattern as XCom
backends, secrets backends, etc. Providers can implement alternatives
(S3,
Redis, dedicated DB).
MULTI-TEAM ISOLATION
team_name is pulled from the Dag's bundle-to-team association, not
user-provided. It will be baked into the cache key hash and stored as a
DB
column for query filtering. When multi_team is enabled, no cross-team
cache hits are possible.
TASK TYPE SUPPORT
- Operators and Transfers: cacheable, opt-in
- Sensors: BaseSensorOperator defaults to cache=False since the point
of
a sensor is to check the world right now, not return a stored answer.
Users could explicitly re-enable it on a per-sensor basis if they want to
for some reason since BaseSensorOperator inherits BaseOperator, but
that's
a "free" feature and not something extra I intend to put time into
supporting or explicitly blocking.
- Waiters and Notifiers: not applicable (don't inherit BaseOperator)
ADMIN CONTROLS
- Global kill switch: [core] task_caching_enabled
- Global TTL default in airflow.cfg
- Cache backend class path: [core] task_cache_backend
- Fail-open timeout (default 500ms?)
PRIOR ART
I could not find any prior discussion of this in the Airflow community
but
I checked Prefect and Flyte; both of them have task caching. This design
is closest to Flyte's (opt-in, source-code-aware, project/domain
isolation)
with stronger multi-team isolation and TTL support.
LAUNCH PLAN
This would launch as an experimental feature behind a feature flag.
OPEN QUESTIONS
- Does this need an AIP?
- Any concerns with adding a "cache" param to BaseOperator?
- Thoughts on the cache key structure (include/exclude approach)?
- Anything I'm missing?
Looking forward to your thoughts.
- ferruzzi