Like... XComs it makes sense to ship to object storage since it can be necessary to share large amounts of data between tasks. But something to track trigger state for event-driven scheduling should consistently be small?
On Tue, Jun 10, 2025 at 1:58 PM Ryan Hatter <ryan.hat...@astronomer.io> wrote: > I don't think we should try to take on Kafka, but better supporting > event-driven scheduling is one of the oft-repeated highlights of Airflow 3. > > IMO, it doesn't make sense to manage state using object storage. A simple > model in Airflow would be suitable. > > On Mon, Jun 9, 2025 at 8:54 AM Jarek Potiuk <ja...@potiuk.com> wrote: > >> Loose thoughts to spark the discussion: >> >> I think in the past, the basic architectural assumptions of Airflow made >> us >> say "no" to such requests - and keeping state because we had strong >> assumptions about data intervals and "idempotency" (You can look up >> devlist >> for our long past discussions what idempotency is in the context of >> Airflow). >> >> But in the Airflow 3 where we have more ways of looking at DAGs and >> scheduling them, I agree some kind of common abstraction on "how do I >> store >> my state" would be useful. >> >> I think it should be considered who is the "owner" of such a state - i.e. >> where the state is attached to and scenarios it should be used - because >> it >> will impact the characteristics - re. scalability and performance. I.e. If >> we are trying to have "watermarks" to allow streaming kafka-like >> performance and scalability, that is very different from much more coarse >> grained state retrieval and access. >> >> Because if our expectations are not very "high", re: latency etc. - I >> think >> we almost have a "medium-weight" state storage via a pretty >> generic mechanism - "airflow.io". Users can pick an object storage of >> their >> choice as state storage (even LocalFileStorage if they run a small >> airflow >> instance with Local Executor). In a number of cases this can be accessed >> asynchronously in triggers in a number of ffspec implementations (I guess >> we should check how much of the "async" is implemented in the various >> implementations we already have). >> >> So **maybe** this could be done by "best practices" about telling people, >> testing and making sure we have the right async abstractions working for >> triggers to be able to access the common.io. >> >> Of course - maybe we think about other performance/latency characteristics >> for such a state, so maybe object storage is not enough ? But then I am >> not >> sure if Another Task SDK Model and API is the right way. With Airflow 3 it >> will have to go through API server anyway, so it already centralises some >> processing and has some latency, and possibly distributed object storage >> is >> a "better" way to store state? >> >> J. >> >> >> >> On Wed, Jun 4, 2025 at 9:13 PM Jake Roach <jake.ro...@astronomer.io >> .invalid> >> wrote: >> >> > *Problem* >> > >> > >> > Currently, there is no way to persist state within a Trigger, or within >> any >> > Airflow object, for that matter. >> > >> > This presents a challenge to Airflow users, especially those looking to >> > “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the >> > BaseEventTrigger) to create Asset events and trigger DAG execution for >> > event-driven DAGs. So far, the only implementations have been >> > queue/stream-based. However, there are a number of other “incremental” >> > events that users may want to invoke DAGs upon, such as: >> > >> > >> > - >> > >> > New files land or are updated in S3 >> > - >> > >> > New records are added to a Postgres table. >> > >> > >> > In order to implement this type of logic in a trigger, a sort of >> > “watermark” is required. This could be in the form of a “last file >> > processed” or “last modified time”. For example, a Trigger should only >> > “look back” at files that have landed in S3 since the timestamp of the >> last >> > file that resulted in a TriggerEvent being emitted. Otherwise, a sort >> > of infinite >> > loop may be created >> > < >> > >> https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling >> > > >> > . >> > >> > Since there is not a supported way to reliably store state in a Trigger, >> > this “watermark” cannot be persisted and retrieved by other Trigger >> runs. >> > *This >> > makes creating a Trigger for incremental events quite difficult.* >> > >> > >> > Previous Work >> > >> > >> > Daniel Standish created AIP-30 >> > < >> > >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence >> > > >> > to >> > address this issue. The use-cases that he mentions in this AIP closely >> > align with the problems that Airflow users will try to solve with >> > event-driven scheduling. This was not implemented, but remains relevant. >> > >> > Airflow users and contributors have also started to implement their own >> > “workarounds” to power event-driven scheduling. However, there has not >> been >> > a robust solution implemented yet. >> > >> > >> > Proposed Solution >> > >> > >> > Prior to the proposed solution below, I tested and observed different >> > approaches from the community. The below were considered before being >> > rejected. >> > >> > 1. >> > >> > Using Airflow Variables to set and retrieve key-value pairs. >> > Unfortunately, >> > Variable.set(...) cannot be run by code on the Triggerer. This >> approach >> > also had other downsides, such as the risk of unintentionally >> > modifications >> > to a Variable. >> > 2. >> > >> > Storing key-value pairs in some external store. This would require >> > additional set up for Airflow users, and would make event-driven >> > scheduling >> > feel less “native”. >> > 3. >> > >> > Using the /assets endpoint to find the last_asset_event.timestamp. >> Since >> > a Trigger is defined independent of an Asset, there is no >> > straightforward >> > way to retrieve this timestamp. Plus, this was not a flexible >> approach >> > for >> > non-temporal watermarks. >> > >> > >> > Heavily borrowing from the ideas in AIP-30, I’m proposing that a new >> model >> > is added to Airflow. This model would be called process_state (or >> something >> > similar) and have the fields namespace, name, key, value. A process >> > namespace and name would be provided when defining a Trigger, allowing >> for >> > a key-value pair to be stored and retrieved within a Trigger itself. The >> > process_state model could also be used by Sensors implementing >> Trigger-like >> > logic. >> > >> > For nearly all “incremental” processes, this provides a way to maintain >> > state and unblock the development of event Triggers. >> > >> > If the community decides to move forward with this approach, I’d like to >> > spearhead the effort w.r.t. the development, testing, and documentation >> of >> > this functionality. >> > >> > *Jake Roach* >> > Field Data Engineer >> > (he/him/his) >> > >> > >> > Email: jake.ro...@astronomer.io >> > >> > Mobile: ( <555-555-5555>716) 364-4203 >> > >> >