Hi all,

I've heard a few people discussing the idea and with the 
Deadlines/Callbacks/Workloads project wrapping up, I'd like to work on Task 
Result Caching as my next project.  I'm looking for feedback before I start 
implementation.

The short version: for ML and other repetitive workloads, tasks often run with 
identical inputs and produce identical outputs across Dag runs.  Currently we 
spawn a subprocess every time regardless.  I'd like to add an optional, 
per-task caching mechanism that lets the scheduler short-circuit execution when 
a valid cached result already exists.  No subprocess spawned, no executor slot 
consumed.

I don't feel like this requires an AIP, but I'm happy to write one if the 
community feels otherwise.

THE BASICS

Users opt in per-task:

      @task(cache=True)
      def train_model(data_path: str): ...

      @task(cache=CacheConfig(ttl=3600, exclude_inputs=["account_id"]))
      def fetch_data(account_id: str, date_range: str): ...

      # Classic operators
      PythonOperator(..., cache=True)

I propose a new "cache" parameter to BaseOperator:
      cache: CacheConfig | bool | None = None

      cache=True is sugar for CacheConfig() with all defaults.  Tasks without 
cache set are completely unaffected; the scheduler skips the check entirely.


HOW IT WORKS

  1. Scheduler is about to queue a cache-enabled task.
  2. Computes a cache key from the task's identity and inputs.
  3. Checks the cache backend (with a fail-open timeout).
  4. Cache hit: mark SUCCESS, push cached result to XCom, done.
  5. Cache miss or timeout: execute normally, store result afterward.

The cache check happens in the scheduler before spawning the subprocess.  This 
is the whole point; we avoid the 100ms+ (estimated, I don't have hard numbers 
on this) subprocess overhead entirely on cache hits.  If the cache backend is 
slow or unreachable, we timeout and execute normally.


CACHE KEY STRUCTURE

The cache key is a hash of:

  - Always included (system-managed): team_name, dag_id, task_id, task source 
hash
  - Included by default: all kwargs passed to the task
  - User-subtractable via exclude_inputs: kwargs irrelevant to the result
  - User-addable via include_inputs: context fields like dag_run.conf keys

The task source hash is computed on the fly from the serialized Dag JSON 
(already loaded in the scheduler) and calculating it shouldn't take more than a 
couple microseconds.  If the task's code changes, the hash changes, and the 
cache is invalid.


CACHE INVALIDATION

  - include_inputs / exclude_inputs determine identity ("Are these the same 
inputs?")
  - TTL determines validity ("Are these results still fresh?")

A cache hit requires both.  TTL is configurable with a global default and 
per-task override.  Manual invalidation may also be available or may be a 
future addition, I'm not sure on exactly how that would work.


STORAGE BACKEND

Pluggable interface with a metadata DB default.  Same pattern as XCom backends, 
secrets backends, etc.  Providers can implement alternatives (S3, Redis, 
dedicated DB).


MULTI-TEAM ISOLATION

team_name is pulled from the Dag's bundle-to-team association, not 
user-provided.  It will be baked into the cache key hash and stored as a DB 
column for query filtering.  When multi_team is enabled, no cross-team cache 
hits are possible.


TASK TYPE SUPPORT

  - Operators and Transfers: cacheable, opt-in
  - Sensors: BaseSensorOperator defaults to cache=False since the point of a 
sensor is to check the world right now, not return a stored answer.  Users 
could explicitly re-enable it on a per-sensor basis if they want to for some 
reason since BaseSensorOperator inherits BaseOperator, but that's a "free" 
feature and not something extra I intend to put time into supporting or 
explicitly blocking.
  - Waiters and Notifiers: not applicable (don't inherit BaseOperator)

ADMIN CONTROLS

  - Global kill switch: [core] task_caching_enabled
  - Global TTL default in airflow.cfg
  - Cache backend class path: [core] task_cache_backend
  - Fail-open timeout (default 500ms?)

PRIOR ART

I could not find any prior discussion of this in the Airflow community but I 
checked Prefect and Flyte; both of them have task caching.  This design is 
closest to Flyte's (opt-in, source-code-aware, project/domain isolation) with 
stronger multi-team isolation and TTL support.


LAUNCH PLAN

This would launch as an experimental feature behind a feature flag.


OPEN QUESTIONS

  - Does this need an AIP?
  - Any concerns with adding a "cache" param to BaseOperator?
  - Thoughts on the cache key structure (include/exclude approach)?
  - Anything I'm missing?

  Looking forward to your thoughts.

  - ferruzzi

Reply via email to