Indeed, I think it's not very obvious whether metadata db storage for state or external storage is better. I think we might underestimate speed and latency of modern distributed object storage vs. our centralized api server (with essentially http overhead that s3 storage has per request + DB read/write). Writing and reading from the same DB that is already busy handling scheduling and many workers accessing it might be actually slower and have more latency. (that's why I was writing about characteristics needed).
I think the key here is whether we can use an async interface for such watermark writing - more than whether it's a DB or object storage or Redis or whatever else. While for sure for small installation, DB will be less latency, scalability of using object storage quite certainly has higher limits than whatever we can come up with having a single relational DB (and our DBs do not handle active/active replication). I would be simply pretty cautious about adding "more" stress on our DB - that's why I think there is a kind of beauty in using potentially "other" common storage for key value (that is also better distributed). Ideally we could just use a dedicated redis/valkey cluster for it (but we likely do not want to add yet-another-component) to Airflow. For a long time when we discussed "asset watchers", I saw it as a way for Airflow to provide a "streamish" experience - where while we do not have Kafka-like streaming capabilities of handling many, many messages per second, but we could potentially get much more "throughput" of handled messages/started dag runs and far less latency (which is also key for any kind of inference workflows). And in this case I think if we can leverage the async interface (which quite likely comes for free with most object storages) - this is where savings might be big. If we make our Task SDK + DB write async - we can likely achieve small latency here as well - possibly. Especially that when it comes to asset watcher, what we are really talking about is: 1) read watermark when started to recover (and keep it in memory while asset watcher is running) 2) (potentially) asynchronously write the watermark every time trigger is awoken - paired with creating DagRun in our DB (via TaskSDK/trigger DAG). So we have a single read at the start and a sequence of writes (which depending on what guarantees we want to deliver - at-most-once, at least once, exactly once (please no!) etc. - might be asynchronous. If we have small messages and sync APIs 2) is almost by definition 2x slower if both watermark and DagRun creation are using the same API/DB (two small-ish DB calls) - compared to having Async watermark writing to a different (distributed) system and limits of scalability are for sure much smaller. But yeah - "thinking of storage" is just a wild idea to consider and maybe try. Async API-server + Async DB might also work (and does not require external storage). I also think that maybe "generic" state storage is not the best way to approach it - I think (if we want to make it "airflow native") - we should carefully look at the use case where it will be used and maybe it will turn out that we can have different solutions for different cases. Asset watcher is one, clear case (which again - might be solved without adding new DB model), but in the document by Daniel there was a proposal for ProcessState, TaskState, and TaskInstanceState - each of them with very different usage characteristics and cardinality of stored state - and they might be implemented differently. J. On Wed, Jun 11, 2025 at 10:23 PM Ash Berlin-Taylor <a...@apache.org> wrote: > Paging Daniel Standish — he’s got an old AIP (30 I think) that was > targeting this problem but the airflow “model” has moved on a bit since so > it might need re-working for the Airflow 3 world. > > > -ash > > > On 10 Jun 2025, at 19:08, Ryan Hatter <ryan.hat...@astronomer.io.INVALID> > wrote: > > > > Like... XComs it makes sense to ship to object storage since it can be > > necessary to share large amounts of data between tasks. But something to > > track trigger state for event-driven scheduling should consistently be > > small? > > > > On Tue, Jun 10, 2025 at 1:58 PM Ryan Hatter <ryan.hat...@astronomer.io> > > wrote: > > > >> I don't think we should try to take on Kafka, but better supporting > >> event-driven scheduling is one of the oft-repeated highlights of > Airflow 3. > >> > >> IMO, it doesn't make sense to manage state using object storage. A > simple > >> model in Airflow would be suitable. > >> > >> On Mon, Jun 9, 2025 at 8:54 AM Jarek Potiuk <ja...@potiuk.com> wrote: > >> > >>> Loose thoughts to spark the discussion: > >>> > >>> I think in the past, the basic architectural assumptions of Airflow > made > >>> us > >>> say "no" to such requests - and keeping state because we had strong > >>> assumptions about data intervals and "idempotency" (You can look up > >>> devlist > >>> for our long past discussions what idempotency is in the context of > >>> Airflow). > >>> > >>> But in the Airflow 3 where we have more ways of looking at DAGs and > >>> scheduling them, I agree some kind of common abstraction on "how do I > >>> store > >>> my state" would be useful. > >>> > >>> I think it should be considered who is the "owner" of such a state - > i.e. > >>> where the state is attached to and scenarios it should be used - > because > >>> it > >>> will impact the characteristics - re. scalability and performance. > I.e. If > >>> we are trying to have "watermarks" to allow streaming kafka-like > >>> performance and scalability, that is very different from much more > coarse > >>> grained state retrieval and access. > >>> > >>> Because if our expectations are not very "high", re: latency etc. - I > >>> think > >>> we almost have a "medium-weight" state storage via a pretty > >>> generic mechanism - "airflow.io". Users can pick an object storage of > >>> their > >>> choice as state storage (even LocalFileStorage if they run a small > >>> airflow > >>> instance with Local Executor). In a number of cases this can be > accessed > >>> asynchronously in triggers in a number of ffspec implementations (I > guess > >>> we should check how much of the "async" is implemented in the various > >>> implementations we already have). > >>> > >>> So **maybe** this could be done by "best practices" about telling > people, > >>> testing and making sure we have the right async abstractions working > for > >>> triggers to be able to access the common.io. > >>> > >>> Of course - maybe we think about other performance/latency > characteristics > >>> for such a state, so maybe object storage is not enough ? But then I am > >>> not > >>> sure if Another Task SDK Model and API is the right way. With Airflow > 3 it > >>> will have to go through API server anyway, so it already centralises > some > >>> processing and has some latency, and possibly distributed object > storage > >>> is > >>> a "better" way to store state? > >>> > >>> J. > >>> > >>> > >>> > >>> On Wed, Jun 4, 2025 at 9:13 PM Jake Roach <jake.ro...@astronomer.io > >>> .invalid> > >>> wrote: > >>> > >>>> *Problem* > >>>> > >>>> > >>>> Currently, there is no way to persist state within a Trigger, or > within > >>> any > >>>> Airflow object, for that matter. > >>>> > >>>> This presents a challenge to Airflow users, especially those looking > to > >>>> “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the > >>>> BaseEventTrigger) to create Asset events and trigger DAG execution for > >>>> event-driven DAGs. So far, the only implementations have been > >>>> queue/stream-based. However, there are a number of other “incremental” > >>>> events that users may want to invoke DAGs upon, such as: > >>>> > >>>> > >>>> - > >>>> > >>>> New files land or are updated in S3 > >>>> - > >>>> > >>>> New records are added to a Postgres table. > >>>> > >>>> > >>>> In order to implement this type of logic in a trigger, a sort of > >>>> “watermark” is required. This could be in the form of a “last file > >>>> processed” or “last modified time”. For example, a Trigger should only > >>>> “look back” at files that have landed in S3 since the timestamp of the > >>> last > >>>> file that resulted in a TriggerEvent being emitted. Otherwise, a sort > >>>> of infinite > >>>> loop may be created > >>>> < > >>>> > >>> > https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling > >>>>> > >>>> . > >>>> > >>>> Since there is not a supported way to reliably store state in a > Trigger, > >>>> this “watermark” cannot be persisted and retrieved by other Trigger > >>> runs. > >>>> *This > >>>> makes creating a Trigger for incremental events quite difficult.* > >>>> > >>>> > >>>> Previous Work > >>>> > >>>> > >>>> Daniel Standish created AIP-30 > >>>> < > >>>> > >>> > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence > >>>>> > >>>> to > >>>> address this issue. The use-cases that he mentions in this AIP closely > >>>> align with the problems that Airflow users will try to solve with > >>>> event-driven scheduling. This was not implemented, but remains > relevant. > >>>> > >>>> Airflow users and contributors have also started to implement their > own > >>>> “workarounds” to power event-driven scheduling. However, there has not > >>> been > >>>> a robust solution implemented yet. > >>>> > >>>> > >>>> Proposed Solution > >>>> > >>>> > >>>> Prior to the proposed solution below, I tested and observed different > >>>> approaches from the community. The below were considered before being > >>>> rejected. > >>>> > >>>> 1. > >>>> > >>>> Using Airflow Variables to set and retrieve key-value pairs. > >>>> Unfortunately, > >>>> Variable.set(...) cannot be run by code on the Triggerer. This > >>> approach > >>>> also had other downsides, such as the risk of unintentionally > >>>> modifications > >>>> to a Variable. > >>>> 2. > >>>> > >>>> Storing key-value pairs in some external store. This would require > >>>> additional set up for Airflow users, and would make event-driven > >>>> scheduling > >>>> feel less “native”. > >>>> 3. > >>>> > >>>> Using the /assets endpoint to find the last_asset_event.timestamp. > >>> Since > >>>> a Trigger is defined independent of an Asset, there is no > >>>> straightforward > >>>> way to retrieve this timestamp. Plus, this was not a flexible > >>> approach > >>>> for > >>>> non-temporal watermarks. > >>>> > >>>> > >>>> Heavily borrowing from the ideas in AIP-30, I’m proposing that a new > >>> model > >>>> is added to Airflow. This model would be called process_state (or > >>> something > >>>> similar) and have the fields namespace, name, key, value. A process > >>>> namespace and name would be provided when defining a Trigger, allowing > >>> for > >>>> a key-value pair to be stored and retrieved within a Trigger itself. > The > >>>> process_state model could also be used by Sensors implementing > >>> Trigger-like > >>>> logic. > >>>> > >>>> For nearly all “incremental” processes, this provides a way to > maintain > >>>> state and unblock the development of event Triggers. > >>>> > >>>> If the community decides to move forward with this approach, I’d like > to > >>>> spearhead the effort w.r.t. the development, testing, and > documentation > >>> of > >>>> this functionality. > >>>> > >>>> *Jake Roach* > >>>> Field Data Engineer > >>>> (he/him/his) > >>>> > >>>> > >>>> Email: jake.ro...@astronomer.io > >>>> > >>>> Mobile: ( <555-555-5555>716) 364-4203 > >>>> > >>> > >> > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org > For additional commands, e-mail: dev-h...@airflow.apache.org > >