Paging Daniel Standish — he’s got an old AIP (30 I think) that was targeting 
this problem but the airflow “model” has moved on a bit since so it might need 
re-working for the Airflow 3 world.


-ash

> On 10 Jun 2025, at 19:08, Ryan Hatter <ryan.hat...@astronomer.io.INVALID> 
> wrote:
> 
> Like... XComs it makes sense to ship to object storage since it can be
> necessary to share large amounts of data between tasks. But something to
> track trigger state for event-driven scheduling should consistently be
> small?
> 
> On Tue, Jun 10, 2025 at 1:58 PM Ryan Hatter <ryan.hat...@astronomer.io>
> wrote:
> 
>> I don't think we should try to take on Kafka, but better supporting
>> event-driven scheduling is one of the oft-repeated highlights of Airflow 3.
>> 
>> IMO, it doesn't make sense to manage state using object storage. A simple
>> model in Airflow would be suitable.
>> 
>> On Mon, Jun 9, 2025 at 8:54 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>> 
>>> Loose thoughts to spark the discussion:
>>> 
>>> I think in the past, the basic architectural assumptions of Airflow made
>>> us
>>> say "no" to such requests - and keeping state because we had strong
>>> assumptions about data intervals and "idempotency" (You can look up
>>> devlist
>>> for our long past discussions what idempotency is in the context of
>>> Airflow).
>>> 
>>> But in the Airflow 3 where we have more ways of looking at DAGs and
>>> scheduling them, I agree some kind of common abstraction on "how do I
>>> store
>>> my state" would be useful.
>>> 
>>> I think it should be considered who is the "owner" of such a state - i.e.
>>> where the state is attached to and scenarios it should be used - because
>>> it
>>> will impact the characteristics - re. scalability and performance. I.e. If
>>> we are trying to have "watermarks" to allow streaming kafka-like
>>> performance and scalability, that is very different from much more coarse
>>> grained state retrieval and access.
>>> 
>>> Because if our expectations are not very "high", re: latency etc. - I
>>> think
>>> we almost have a "medium-weight" state storage via a pretty
>>> generic mechanism - "airflow.io". Users can pick an object storage of
>>> their
>>> choice as state storage  (even LocalFileStorage if they run a small
>>> airflow
>>> instance with Local Executor). In a number of cases this can be accessed
>>> asynchronously in triggers in a number of ffspec implementations (I guess
>>> we should check how much of the "async" is implemented in the various
>>> implementations we already have).
>>> 
>>> So **maybe** this could be done by "best practices" about telling people,
>>> testing and making sure we have the right async abstractions working for
>>> triggers to be able to access the common.io.
>>> 
>>> Of course - maybe we think about other performance/latency characteristics
>>> for such a state, so maybe object storage is not enough ? But then I am
>>> not
>>> sure if Another Task SDK Model and API is the right way. With Airflow 3 it
>>> will have to go through API server anyway, so it already centralises some
>>> processing and has some latency, and possibly distributed object storage
>>> is
>>> a "better" way to store state?
>>> 
>>> J.
>>> 
>>> 
>>> 
>>> On Wed, Jun 4, 2025 at 9:13 PM Jake Roach <jake.ro...@astronomer.io
>>> .invalid>
>>> wrote:
>>> 
>>>> *Problem*
>>>> 
>>>> 
>>>> Currently, there is no way to persist state within a Trigger, or within
>>> any
>>>> Airflow object, for that matter.
>>>> 
>>>> This presents a challenge to Airflow users, especially those looking to
>>>> “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the
>>>> BaseEventTrigger) to create Asset events and trigger DAG execution for
>>>> event-driven DAGs. So far, the only implementations have been
>>>> queue/stream-based. However, there are a number of other “incremental”
>>>> events that users may want to invoke DAGs upon, such as:
>>>> 
>>>> 
>>>>   -
>>>> 
>>>>   New files land or are updated in S3
>>>>   -
>>>> 
>>>>   New records are added to a Postgres table.
>>>> 
>>>> 
>>>> In order to implement this type of logic in a trigger, a sort of
>>>> “watermark” is required. This could be in the form of a “last file
>>>> processed” or “last modified time”. For example, a Trigger should only
>>>> “look back” at files that have landed in S3 since the timestamp of the
>>> last
>>>> file that resulted in a TriggerEvent being emitted. Otherwise, a sort
>>>> of infinite
>>>> loop may be created
>>>> <
>>>> 
>>> https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling
>>>>> 
>>>> .
>>>> 
>>>> Since there is not a supported way to reliably store state in a Trigger,
>>>> this “watermark” cannot be persisted and retrieved by other Trigger
>>> runs.
>>>> *This
>>>> makes creating a Trigger for incremental events quite difficult.*
>>>> 
>>>> 
>>>> Previous Work
>>>> 
>>>> 
>>>> Daniel Standish created AIP-30
>>>> <
>>>> 
>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
>>>>> 
>>>> to
>>>> address this issue. The use-cases that he mentions in this AIP closely
>>>> align with the problems that Airflow users will try to solve with
>>>> event-driven scheduling. This was not implemented, but remains relevant.
>>>> 
>>>> Airflow users and contributors have also started to implement their own
>>>> “workarounds” to power event-driven scheduling. However, there has not
>>> been
>>>> a robust solution implemented yet.
>>>> 
>>>> 
>>>> Proposed Solution
>>>> 
>>>> 
>>>> Prior to the proposed solution below, I tested and observed different
>>>> approaches from the community. The below were considered before being
>>>> rejected.
>>>> 
>>>>   1.
>>>> 
>>>>   Using Airflow Variables to set and retrieve key-value pairs.
>>>> Unfortunately,
>>>>   Variable.set(...) cannot be run by code on the Triggerer. This
>>> approach
>>>>   also had other downsides, such as the risk of unintentionally
>>>> modifications
>>>>   to a Variable.
>>>>   2.
>>>> 
>>>>   Storing key-value pairs in some external store. This would require
>>>>   additional set up for Airflow users, and would make event-driven
>>>> scheduling
>>>>   feel less “native”.
>>>>   3.
>>>> 
>>>>   Using the /assets endpoint to find the last_asset_event.timestamp.
>>> Since
>>>>   a Trigger is defined independent of an Asset, there is no
>>>> straightforward
>>>>   way to retrieve this timestamp. Plus, this was not a flexible
>>> approach
>>>> for
>>>>   non-temporal watermarks.
>>>> 
>>>> 
>>>> Heavily borrowing from the ideas in AIP-30, I’m proposing that a new
>>> model
>>>> is added to Airflow. This model would be called process_state (or
>>> something
>>>> similar) and have the fields namespace, name, key, value. A process
>>>> namespace and name would be provided when defining a Trigger, allowing
>>> for
>>>> a key-value pair to be stored and retrieved within a Trigger itself. The
>>>> process_state model could also be used by Sensors implementing
>>> Trigger-like
>>>> logic.
>>>> 
>>>> For nearly all “incremental” processes, this provides a way to maintain
>>>> state and unblock the development of event Triggers.
>>>> 
>>>> If the community decides to move forward with this approach, I’d like to
>>>> spearhead the effort w.r.t. the development, testing, and documentation
>>> of
>>>> this functionality.
>>>> 
>>>> *Jake Roach*
>>>> Field Data Engineer
>>>> (he/him/his)
>>>> 
>>>> 
>>>> Email: jake.ro...@astronomer.io
>>>> 
>>>> Mobile: ( <555-555-5555>716) 364-4203
>>>> 
>>> 
>> 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
For additional commands, e-mail: dev-h...@airflow.apache.org

Reply via email to