Hi all,
I've heard a few people discussing the idea and with the
Deadlines/Callbacks/Workloads project wrapping up, I'd like to work on Task
Result Caching as my next project. I'm looking for feedback before I start
implementation.
The short version: for ML and other repetitive workloads, tasks often run with
identical inputs and produce identical outputs across Dag runs. Currently we
spawn a subprocess every time regardless. I'd like to add an optional,
per-task caching mechanism that lets the scheduler short-circuit execution when
a valid cached result already exists. No subprocess spawned, no executor slot
consumed.
I don't feel like this requires an AIP, but I'm happy to write one if the
community feels otherwise.
THE BASICS
Users opt in per-task:
@task(cache=True)
def train_model(data_path: str): ...
@task(cache=CacheConfig(ttl=3600, exclude_inputs=["account_id"]))
def fetch_data(account_id: str, date_range: str): ...
# Classic operators
PythonOperator(..., cache=True)
I propose a new "cache" parameter to BaseOperator:
cache: CacheConfig | bool | None = None
cache=True is sugar for CacheConfig() with all defaults. Tasks without
cache set are completely unaffected; the scheduler skips the check entirely.
HOW IT WORKS
1. Scheduler is about to queue a cache-enabled task.
2. Computes a cache key from the task's identity and inputs.
3. Checks the cache backend (with a fail-open timeout).
4. Cache hit: mark SUCCESS, push cached result to XCom, done.
5. Cache miss or timeout: execute normally, store result afterward.
The cache check happens in the scheduler before spawning the subprocess. This
is the whole point; we avoid the 100ms+ (estimated, I don't have hard numbers
on this) subprocess overhead entirely on cache hits. If the cache backend is
slow or unreachable, we timeout and execute normally.
CACHE KEY STRUCTURE
The cache key is a hash of:
- Always included (system-managed): team_name, dag_id, task_id, task source
hash
- Included by default: all kwargs passed to the task
- User-subtractable via exclude_inputs: kwargs irrelevant to the result
- User-addable via include_inputs: context fields like dag_run.conf keys
The task source hash is computed on the fly from the serialized Dag JSON
(already loaded in the scheduler) and calculating it shouldn't take more than a
couple microseconds. If the task's code changes, the hash changes, and the
cache is invalid.
CACHE INVALIDATION
- include_inputs / exclude_inputs determine identity ("Are these the same
inputs?")
- TTL determines validity ("Are these results still fresh?")
A cache hit requires both. TTL is configurable with a global default and
per-task override. Manual invalidation may also be available or may be a
future addition, I'm not sure on exactly how that would work.
STORAGE BACKEND
Pluggable interface with a metadata DB default. Same pattern as XCom backends,
secrets backends, etc. Providers can implement alternatives (S3, Redis,
dedicated DB).
MULTI-TEAM ISOLATION
team_name is pulled from the Dag's bundle-to-team association, not
user-provided. It will be baked into the cache key hash and stored as a DB
column for query filtering. When multi_team is enabled, no cross-team cache
hits are possible.
TASK TYPE SUPPORT
- Operators and Transfers: cacheable, opt-in
- Sensors: BaseSensorOperator defaults to cache=False since the point of a
sensor is to check the world right now, not return a stored answer. Users
could explicitly re-enable it on a per-sensor basis if they want to for some
reason since BaseSensorOperator inherits BaseOperator, but that's a "free"
feature and not something extra I intend to put time into supporting or
explicitly blocking.
- Waiters and Notifiers: not applicable (don't inherit BaseOperator)
ADMIN CONTROLS
- Global kill switch: [core] task_caching_enabled
- Global TTL default in airflow.cfg
- Cache backend class path: [core] task_cache_backend
- Fail-open timeout (default 500ms?)
PRIOR ART
I could not find any prior discussion of this in the Airflow community but I
checked Prefect and Flyte; both of them have task caching. This design is
closest to Flyte's (opt-in, source-code-aware, project/domain isolation) with
stronger multi-team isolation and TTL support.
LAUNCH PLAN
This would launch as an experimental feature behind a feature flag.
OPEN QUESTIONS
- Does this need an AIP?
- Any concerns with adding a "cache" param to BaseOperator?
- Thoughts on the cache key structure (include/exclude approach)?
- Anything I'm missing?
Looking forward to your thoughts.
- ferruzzi