Like... XComs it makes sense to ship to object storage since it can be
necessary to share large amounts of data between tasks. But something to
track trigger state for event-driven scheduling should consistently be
small?

On Tue, Jun 10, 2025 at 1:58 PM Ryan Hatter <ryan.hat...@astronomer.io>
wrote:

> I don't think we should try to take on Kafka, but better supporting
> event-driven scheduling is one of the oft-repeated highlights of Airflow 3.
>
> IMO, it doesn't make sense to manage state using object storage. A simple
> model in Airflow would be suitable.
>
> On Mon, Jun 9, 2025 at 8:54 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Loose thoughts to spark the discussion:
>>
>> I think in the past, the basic architectural assumptions of Airflow made
>> us
>> say "no" to such requests - and keeping state because we had strong
>> assumptions about data intervals and "idempotency" (You can look up
>> devlist
>> for our long past discussions what idempotency is in the context of
>> Airflow).
>>
>> But in the Airflow 3 where we have more ways of looking at DAGs and
>> scheduling them, I agree some kind of common abstraction on "how do I
>> store
>> my state" would be useful.
>>
>> I think it should be considered who is the "owner" of such a state - i.e.
>> where the state is attached to and scenarios it should be used - because
>> it
>> will impact the characteristics - re. scalability and performance. I.e. If
>> we are trying to have "watermarks" to allow streaming kafka-like
>> performance and scalability, that is very different from much more coarse
>> grained state retrieval and access.
>>
>> Because if our expectations are not very "high", re: latency etc. - I
>> think
>> we almost have a "medium-weight" state storage via a pretty
>> generic mechanism - "airflow.io". Users can pick an object storage of
>> their
>> choice as state storage  (even LocalFileStorage if they run a small
>> airflow
>> instance with Local Executor). In a number of cases this can be accessed
>> asynchronously in triggers in a number of ffspec implementations (I guess
>> we should check how much of the "async" is implemented in the various
>> implementations we already have).
>>
>> So **maybe** this could be done by "best practices" about telling people,
>> testing and making sure we have the right async abstractions working for
>> triggers to be able to access the common.io.
>>
>> Of course - maybe we think about other performance/latency characteristics
>> for such a state, so maybe object storage is not enough ? But then I am
>> not
>> sure if Another Task SDK Model and API is the right way. With Airflow 3 it
>> will have to go through API server anyway, so it already centralises some
>> processing and has some latency, and possibly distributed object storage
>> is
>> a "better" way to store state?
>>
>> J.
>>
>>
>>
>> On Wed, Jun 4, 2025 at 9:13 PM Jake Roach <jake.ro...@astronomer.io
>> .invalid>
>> wrote:
>>
>> > *Problem*
>> >
>> >
>> > Currently, there is no way to persist state within a Trigger, or within
>> any
>> > Airflow object, for that matter.
>> >
>> > This presents a challenge to Airflow users, especially those looking to
>> > “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the
>> > BaseEventTrigger) to create Asset events and trigger DAG execution for
>> > event-driven DAGs. So far, the only implementations have been
>> > queue/stream-based. However, there are a number of other “incremental”
>> > events that users may want to invoke DAGs upon, such as:
>> >
>> >
>> >    -
>> >
>> >    New files land or are updated in S3
>> >    -
>> >
>> >    New records are added to a Postgres table.
>> >
>> >
>> > In order to implement this type of logic in a trigger, a sort of
>> > “watermark” is required. This could be in the form of a “last file
>> > processed” or “last modified time”. For example, a Trigger should only
>> > “look back” at files that have landed in S3 since the timestamp of the
>> last
>> > file that resulted in a TriggerEvent being emitted. Otherwise, a sort
>> > of infinite
>> > loop may be created
>> > <
>> >
>> https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling
>> > >
>> > .
>> >
>> > Since there is not a supported way to reliably store state in a Trigger,
>> > this “watermark” cannot be persisted and retrieved by other Trigger
>> runs.
>> > *This
>> > makes creating a Trigger for incremental events quite difficult.*
>> >
>> >
>> > Previous Work
>> >
>> >
>> > Daniel Standish created AIP-30
>> > <
>> >
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
>> > >
>> > to
>> > address this issue. The use-cases that he mentions in this AIP closely
>> > align with the problems that Airflow users will try to solve with
>> > event-driven scheduling. This was not implemented, but remains relevant.
>> >
>> > Airflow users and contributors have also started to implement their own
>> > “workarounds” to power event-driven scheduling. However, there has not
>> been
>> > a robust solution implemented yet.
>> >
>> >
>> > Proposed Solution
>> >
>> >
>> > Prior to the proposed solution below, I tested and observed different
>> > approaches from the community. The below were considered before being
>> > rejected.
>> >
>> >    1.
>> >
>> >    Using Airflow Variables to set and retrieve key-value pairs.
>> > Unfortunately,
>> >    Variable.set(...) cannot be run by code on the Triggerer. This
>> approach
>> >    also had other downsides, such as the risk of unintentionally
>> > modifications
>> >    to a Variable.
>> >    2.
>> >
>> >    Storing key-value pairs in some external store. This would require
>> >    additional set up for Airflow users, and would make event-driven
>> > scheduling
>> >    feel less “native”.
>> >    3.
>> >
>> >    Using the /assets endpoint to find the last_asset_event.timestamp.
>> Since
>> >    a Trigger is defined independent of an Asset, there is no
>> > straightforward
>> >    way to retrieve this timestamp. Plus, this was not a flexible
>> approach
>> > for
>> >    non-temporal watermarks.
>> >
>> >
>> > Heavily borrowing from the ideas in AIP-30, I’m proposing that a new
>> model
>> > is added to Airflow. This model would be called process_state (or
>> something
>> > similar) and have the fields namespace, name, key, value. A process
>> > namespace and name would be provided when defining a Trigger, allowing
>> for
>> > a key-value pair to be stored and retrieved within a Trigger itself. The
>> > process_state model could also be used by Sensors implementing
>> Trigger-like
>> > logic.
>> >
>> > For nearly all “incremental” processes, this provides a way to maintain
>> > state and unblock the development of event Triggers.
>> >
>> > If the community decides to move forward with this approach, I’d like to
>> > spearhead the effort w.r.t. the development, testing, and documentation
>> of
>> > this functionality.
>> >
>> > *Jake Roach*
>> > Field Data Engineer
>> > (he/him/his)
>> >
>> >
>> > Email: jake.ro...@astronomer.io
>> >
>> > Mobile: ( <555-555-5555>716) 364-4203
>> >
>>
>

Reply via email to