I don't think we should try to take on Kafka, but better supporting event-driven scheduling is one of the oft-repeated highlights of Airflow 3.
IMO, it doesn't make sense to manage state using object storage. A simple model in Airflow would be suitable. On Mon, Jun 9, 2025 at 8:54 AM Jarek Potiuk <ja...@potiuk.com> wrote: > Loose thoughts to spark the discussion: > > I think in the past, the basic architectural assumptions of Airflow made us > say "no" to such requests - and keeping state because we had strong > assumptions about data intervals and "idempotency" (You can look up devlist > for our long past discussions what idempotency is in the context of > Airflow). > > But in the Airflow 3 where we have more ways of looking at DAGs and > scheduling them, I agree some kind of common abstraction on "how do I store > my state" would be useful. > > I think it should be considered who is the "owner" of such a state - i.e. > where the state is attached to and scenarios it should be used - because it > will impact the characteristics - re. scalability and performance. I.e. If > we are trying to have "watermarks" to allow streaming kafka-like > performance and scalability, that is very different from much more coarse > grained state retrieval and access. > > Because if our expectations are not very "high", re: latency etc. - I think > we almost have a "medium-weight" state storage via a pretty > generic mechanism - "airflow.io". Users can pick an object storage of > their > choice as state storage (even LocalFileStorage if they run a small airflow > instance with Local Executor). In a number of cases this can be accessed > asynchronously in triggers in a number of ffspec implementations (I guess > we should check how much of the "async" is implemented in the various > implementations we already have). > > So **maybe** this could be done by "best practices" about telling people, > testing and making sure we have the right async abstractions working for > triggers to be able to access the common.io. > > Of course - maybe we think about other performance/latency characteristics > for such a state, so maybe object storage is not enough ? But then I am not > sure if Another Task SDK Model and API is the right way. With Airflow 3 it > will have to go through API server anyway, so it already centralises some > processing and has some latency, and possibly distributed object storage is > a "better" way to store state? > > J. > > > > On Wed, Jun 4, 2025 at 9:13 PM Jake Roach <jake.ro...@astronomer.io > .invalid> > wrote: > > > *Problem* > > > > > > Currently, there is no way to persist state within a Trigger, or within > any > > Airflow object, for that matter. > > > > This presents a challenge to Airflow users, especially those looking to > > “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the > > BaseEventTrigger) to create Asset events and trigger DAG execution for > > event-driven DAGs. So far, the only implementations have been > > queue/stream-based. However, there are a number of other “incremental” > > events that users may want to invoke DAGs upon, such as: > > > > > > - > > > > New files land or are updated in S3 > > - > > > > New records are added to a Postgres table. > > > > > > In order to implement this type of logic in a trigger, a sort of > > “watermark” is required. This could be in the form of a “last file > > processed” or “last modified time”. For example, a Trigger should only > > “look back” at files that have landed in S3 since the timestamp of the > last > > file that resulted in a TriggerEvent being emitted. Otherwise, a sort > > of infinite > > loop may be created > > < > > > https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling > > > > > . > > > > Since there is not a supported way to reliably store state in a Trigger, > > this “watermark” cannot be persisted and retrieved by other Trigger runs. > > *This > > makes creating a Trigger for incremental events quite difficult.* > > > > > > Previous Work > > > > > > Daniel Standish created AIP-30 > > < > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence > > > > > to > > address this issue. The use-cases that he mentions in this AIP closely > > align with the problems that Airflow users will try to solve with > > event-driven scheduling. This was not implemented, but remains relevant. > > > > Airflow users and contributors have also started to implement their own > > “workarounds” to power event-driven scheduling. However, there has not > been > > a robust solution implemented yet. > > > > > > Proposed Solution > > > > > > Prior to the proposed solution below, I tested and observed different > > approaches from the community. The below were considered before being > > rejected. > > > > 1. > > > > Using Airflow Variables to set and retrieve key-value pairs. > > Unfortunately, > > Variable.set(...) cannot be run by code on the Triggerer. This > approach > > also had other downsides, such as the risk of unintentionally > > modifications > > to a Variable. > > 2. > > > > Storing key-value pairs in some external store. This would require > > additional set up for Airflow users, and would make event-driven > > scheduling > > feel less “native”. > > 3. > > > > Using the /assets endpoint to find the last_asset_event.timestamp. > Since > > a Trigger is defined independent of an Asset, there is no > > straightforward > > way to retrieve this timestamp. Plus, this was not a flexible approach > > for > > non-temporal watermarks. > > > > > > Heavily borrowing from the ideas in AIP-30, I’m proposing that a new > model > > is added to Airflow. This model would be called process_state (or > something > > similar) and have the fields namespace, name, key, value. A process > > namespace and name would be provided when defining a Trigger, allowing > for > > a key-value pair to be stored and retrieved within a Trigger itself. The > > process_state model could also be used by Sensors implementing > Trigger-like > > logic. > > > > For nearly all “incremental” processes, this provides a way to maintain > > state and unblock the development of event Triggers. > > > > If the community decides to move forward with this approach, I’d like to > > spearhead the effort w.r.t. the development, testing, and documentation > of > > this functionality. > > > > *Jake Roach* > > Field Data Engineer > > (he/him/his) > > > > > > Email: jake.ro...@astronomer.io > > > > Mobile: ( <555-555-5555>716) 364-4203 > > >