Paging Daniel Standish — he’s got an old AIP (30 I think) that was targeting this problem but the airflow “model” has moved on a bit since so it might need re-working for the Airflow 3 world.
-ash > On 10 Jun 2025, at 19:08, Ryan Hatter <ryan.hat...@astronomer.io.INVALID> > wrote: > > Like... XComs it makes sense to ship to object storage since it can be > necessary to share large amounts of data between tasks. But something to > track trigger state for event-driven scheduling should consistently be > small? > > On Tue, Jun 10, 2025 at 1:58 PM Ryan Hatter <ryan.hat...@astronomer.io> > wrote: > >> I don't think we should try to take on Kafka, but better supporting >> event-driven scheduling is one of the oft-repeated highlights of Airflow 3. >> >> IMO, it doesn't make sense to manage state using object storage. A simple >> model in Airflow would be suitable. >> >> On Mon, Jun 9, 2025 at 8:54 AM Jarek Potiuk <ja...@potiuk.com> wrote: >> >>> Loose thoughts to spark the discussion: >>> >>> I think in the past, the basic architectural assumptions of Airflow made >>> us >>> say "no" to such requests - and keeping state because we had strong >>> assumptions about data intervals and "idempotency" (You can look up >>> devlist >>> for our long past discussions what idempotency is in the context of >>> Airflow). >>> >>> But in the Airflow 3 where we have more ways of looking at DAGs and >>> scheduling them, I agree some kind of common abstraction on "how do I >>> store >>> my state" would be useful. >>> >>> I think it should be considered who is the "owner" of such a state - i.e. >>> where the state is attached to and scenarios it should be used - because >>> it >>> will impact the characteristics - re. scalability and performance. I.e. If >>> we are trying to have "watermarks" to allow streaming kafka-like >>> performance and scalability, that is very different from much more coarse >>> grained state retrieval and access. >>> >>> Because if our expectations are not very "high", re: latency etc. - I >>> think >>> we almost have a "medium-weight" state storage via a pretty >>> generic mechanism - "airflow.io". Users can pick an object storage of >>> their >>> choice as state storage (even LocalFileStorage if they run a small >>> airflow >>> instance with Local Executor). In a number of cases this can be accessed >>> asynchronously in triggers in a number of ffspec implementations (I guess >>> we should check how much of the "async" is implemented in the various >>> implementations we already have). >>> >>> So **maybe** this could be done by "best practices" about telling people, >>> testing and making sure we have the right async abstractions working for >>> triggers to be able to access the common.io. >>> >>> Of course - maybe we think about other performance/latency characteristics >>> for such a state, so maybe object storage is not enough ? But then I am >>> not >>> sure if Another Task SDK Model and API is the right way. With Airflow 3 it >>> will have to go through API server anyway, so it already centralises some >>> processing and has some latency, and possibly distributed object storage >>> is >>> a "better" way to store state? >>> >>> J. >>> >>> >>> >>> On Wed, Jun 4, 2025 at 9:13 PM Jake Roach <jake.ro...@astronomer.io >>> .invalid> >>> wrote: >>> >>>> *Problem* >>>> >>>> >>>> Currently, there is no way to persist state within a Trigger, or within >>> any >>>> Airflow object, for that matter. >>>> >>>> This presents a challenge to Airflow users, especially those looking to >>>> “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the >>>> BaseEventTrigger) to create Asset events and trigger DAG execution for >>>> event-driven DAGs. So far, the only implementations have been >>>> queue/stream-based. However, there are a number of other “incremental” >>>> events that users may want to invoke DAGs upon, such as: >>>> >>>> >>>> - >>>> >>>> New files land or are updated in S3 >>>> - >>>> >>>> New records are added to a Postgres table. >>>> >>>> >>>> In order to implement this type of logic in a trigger, a sort of >>>> “watermark” is required. This could be in the form of a “last file >>>> processed” or “last modified time”. For example, a Trigger should only >>>> “look back” at files that have landed in S3 since the timestamp of the >>> last >>>> file that resulted in a TriggerEvent being emitted. Otherwise, a sort >>>> of infinite >>>> loop may be created >>>> < >>>> >>> https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling >>>>> >>>> . >>>> >>>> Since there is not a supported way to reliably store state in a Trigger, >>>> this “watermark” cannot be persisted and retrieved by other Trigger >>> runs. >>>> *This >>>> makes creating a Trigger for incremental events quite difficult.* >>>> >>>> >>>> Previous Work >>>> >>>> >>>> Daniel Standish created AIP-30 >>>> < >>>> >>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence >>>>> >>>> to >>>> address this issue. The use-cases that he mentions in this AIP closely >>>> align with the problems that Airflow users will try to solve with >>>> event-driven scheduling. This was not implemented, but remains relevant. >>>> >>>> Airflow users and contributors have also started to implement their own >>>> “workarounds” to power event-driven scheduling. However, there has not >>> been >>>> a robust solution implemented yet. >>>> >>>> >>>> Proposed Solution >>>> >>>> >>>> Prior to the proposed solution below, I tested and observed different >>>> approaches from the community. The below were considered before being >>>> rejected. >>>> >>>> 1. >>>> >>>> Using Airflow Variables to set and retrieve key-value pairs. >>>> Unfortunately, >>>> Variable.set(...) cannot be run by code on the Triggerer. This >>> approach >>>> also had other downsides, such as the risk of unintentionally >>>> modifications >>>> to a Variable. >>>> 2. >>>> >>>> Storing key-value pairs in some external store. This would require >>>> additional set up for Airflow users, and would make event-driven >>>> scheduling >>>> feel less “native”. >>>> 3. >>>> >>>> Using the /assets endpoint to find the last_asset_event.timestamp. >>> Since >>>> a Trigger is defined independent of an Asset, there is no >>>> straightforward >>>> way to retrieve this timestamp. Plus, this was not a flexible >>> approach >>>> for >>>> non-temporal watermarks. >>>> >>>> >>>> Heavily borrowing from the ideas in AIP-30, I’m proposing that a new >>> model >>>> is added to Airflow. This model would be called process_state (or >>> something >>>> similar) and have the fields namespace, name, key, value. A process >>>> namespace and name would be provided when defining a Trigger, allowing >>> for >>>> a key-value pair to be stored and retrieved within a Trigger itself. The >>>> process_state model could also be used by Sensors implementing >>> Trigger-like >>>> logic. >>>> >>>> For nearly all “incremental” processes, this provides a way to maintain >>>> state and unblock the development of event Triggers. >>>> >>>> If the community decides to move forward with this approach, I’d like to >>>> spearhead the effort w.r.t. the development, testing, and documentation >>> of >>>> this functionality. >>>> >>>> *Jake Roach* >>>> Field Data Engineer >>>> (he/him/his) >>>> >>>> >>>> Email: jake.ro...@astronomer.io >>>> >>>> Mobile: ( <555-555-5555>716) 364-4203 >>>> >>> >> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org For additional commands, e-mail: dev-h...@airflow.apache.org