Loose thoughts to spark the discussion:

I think in the past, the basic architectural assumptions of Airflow made us
say "no" to such requests - and keeping state because we had strong
assumptions about data intervals and "idempotency" (You can look up devlist
for our long past discussions what idempotency is in the context of
Airflow).

But in the Airflow 3 where we have more ways of looking at DAGs and
scheduling them, I agree some kind of common abstraction on "how do I store
my state" would be useful.

I think it should be considered who is the "owner" of such a state - i.e.
where the state is attached to and scenarios it should be used - because it
will impact the characteristics - re. scalability and performance. I.e. If
we are trying to have "watermarks" to allow streaming kafka-like
performance and scalability, that is very different from much more coarse
grained state retrieval and access.

Because if our expectations are not very "high", re: latency etc. - I think
we almost have a "medium-weight" state storage via a pretty
generic mechanism - "airflow.io". Users can pick an object storage of their
choice as state storage  (even LocalFileStorage if they run a small airflow
instance with Local Executor). In a number of cases this can be accessed
asynchronously in triggers in a number of ffspec implementations (I guess
we should check how much of the "async" is implemented in the various
implementations we already have).

So **maybe** this could be done by "best practices" about telling people,
testing and making sure we have the right async abstractions working for
triggers to be able to access the common.io.

Of course - maybe we think about other performance/latency characteristics
for such a state, so maybe object storage is not enough ? But then I am not
sure if Another Task SDK Model and API is the right way. With Airflow 3 it
will have to go through API server anyway, so it already centralises some
processing and has some latency, and possibly distributed object storage is
a "better" way to store state?

J.



On Wed, Jun 4, 2025 at 9:13 PM Jake Roach <jake.ro...@astronomer.io.invalid>
wrote:

> *Problem*
>
>
> Currently, there is no way to persist state within a Trigger, or within any
> Airflow object, for that matter.
>
> This presents a challenge to Airflow users, especially those looking to
> “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the
> BaseEventTrigger) to create Asset events and trigger DAG execution for
> event-driven DAGs. So far, the only implementations have been
> queue/stream-based. However, there are a number of other “incremental”
> events that users may want to invoke DAGs upon, such as:
>
>
>    -
>
>    New files land or are updated in S3
>    -
>
>    New records are added to a Postgres table.
>
>
> In order to implement this type of logic in a trigger, a sort of
> “watermark” is required. This could be in the form of a “last file
> processed” or “last modified time”. For example, a Trigger should only
> “look back” at files that have landed in S3 since the timestamp of the last
> file that resulted in a TriggerEvent being emitted. Otherwise, a sort
> of infinite
> loop may be created
> <
> https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling
> >
> .
>
> Since there is not a supported way to reliably store state in a Trigger,
> this “watermark” cannot be persisted and retrieved by other Trigger runs.
> *This
> makes creating a Trigger for incremental events quite difficult.*
>
>
> Previous Work
>
>
> Daniel Standish created AIP-30
> <
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
> >
> to
> address this issue. The use-cases that he mentions in this AIP closely
> align with the problems that Airflow users will try to solve with
> event-driven scheduling. This was not implemented, but remains relevant.
>
> Airflow users and contributors have also started to implement their own
> “workarounds” to power event-driven scheduling. However, there has not been
> a robust solution implemented yet.
>
>
> Proposed Solution
>
>
> Prior to the proposed solution below, I tested and observed different
> approaches from the community. The below were considered before being
> rejected.
>
>    1.
>
>    Using Airflow Variables to set and retrieve key-value pairs.
> Unfortunately,
>    Variable.set(...) cannot be run by code on the Triggerer. This approach
>    also had other downsides, such as the risk of unintentionally
> modifications
>    to a Variable.
>    2.
>
>    Storing key-value pairs in some external store. This would require
>    additional set up for Airflow users, and would make event-driven
> scheduling
>    feel less “native”.
>    3.
>
>    Using the /assets endpoint to find the last_asset_event.timestamp. Since
>    a Trigger is defined independent of an Asset, there is no
> straightforward
>    way to retrieve this timestamp. Plus, this was not a flexible approach
> for
>    non-temporal watermarks.
>
>
> Heavily borrowing from the ideas in AIP-30, I’m proposing that a new model
> is added to Airflow. This model would be called process_state (or something
> similar) and have the fields namespace, name, key, value. A process
> namespace and name would be provided when defining a Trigger, allowing for
> a key-value pair to be stored and retrieved within a Trigger itself. The
> process_state model could also be used by Sensors implementing Trigger-like
> logic.
>
> For nearly all “incremental” processes, this provides a way to maintain
> state and unblock the development of event Triggers.
>
> If the community decides to move forward with this approach, I’d like to
> spearhead the effort w.r.t. the development, testing, and documentation of
> this functionality.
>
> *Jake Roach*
> Field Data Engineer
> (he/him/his)
>
>
> Email: jake.ro...@astronomer.io
>
> Mobile: ( <555-555-5555>716) 364-4203
>

Reply via email to