I don't think we should try to take on Kafka, but better supporting
event-driven scheduling is one of the oft-repeated highlights of Airflow 3.

IMO, it doesn't make sense to manage state using object storage. A simple
model in Airflow would be suitable.

On Mon, Jun 9, 2025 at 8:54 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> Loose thoughts to spark the discussion:
>
> I think in the past, the basic architectural assumptions of Airflow made us
> say "no" to such requests - and keeping state because we had strong
> assumptions about data intervals and "idempotency" (You can look up devlist
> for our long past discussions what idempotency is in the context of
> Airflow).
>
> But in the Airflow 3 where we have more ways of looking at DAGs and
> scheduling them, I agree some kind of common abstraction on "how do I store
> my state" would be useful.
>
> I think it should be considered who is the "owner" of such a state - i.e.
> where the state is attached to and scenarios it should be used - because it
> will impact the characteristics - re. scalability and performance. I.e. If
> we are trying to have "watermarks" to allow streaming kafka-like
> performance and scalability, that is very different from much more coarse
> grained state retrieval and access.
>
> Because if our expectations are not very "high", re: latency etc. - I think
> we almost have a "medium-weight" state storage via a pretty
> generic mechanism - "airflow.io". Users can pick an object storage of
> their
> choice as state storage  (even LocalFileStorage if they run a small airflow
> instance with Local Executor). In a number of cases this can be accessed
> asynchronously in triggers in a number of ffspec implementations (I guess
> we should check how much of the "async" is implemented in the various
> implementations we already have).
>
> So **maybe** this could be done by "best practices" about telling people,
> testing and making sure we have the right async abstractions working for
> triggers to be able to access the common.io.
>
> Of course - maybe we think about other performance/latency characteristics
> for such a state, so maybe object storage is not enough ? But then I am not
> sure if Another Task SDK Model and API is the right way. With Airflow 3 it
> will have to go through API server anyway, so it already centralises some
> processing and has some latency, and possibly distributed object storage is
> a "better" way to store state?
>
> J.
>
>
>
> On Wed, Jun 4, 2025 at 9:13 PM Jake Roach <jake.ro...@astronomer.io
> .invalid>
> wrote:
>
> > *Problem*
> >
> >
> > Currently, there is no way to persist state within a Trigger, or within
> any
> > Airflow object, for that matter.
> >
> > This presents a challenge to Airflow users, especially those looking to
> > “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the
> > BaseEventTrigger) to create Asset events and trigger DAG execution for
> > event-driven DAGs. So far, the only implementations have been
> > queue/stream-based. However, there are a number of other “incremental”
> > events that users may want to invoke DAGs upon, such as:
> >
> >
> >    -
> >
> >    New files land or are updated in S3
> >    -
> >
> >    New records are added to a Postgres table.
> >
> >
> > In order to implement this type of logic in a trigger, a sort of
> > “watermark” is required. This could be in the form of a “last file
> > processed” or “last modified time”. For example, a Trigger should only
> > “look back” at files that have landed in S3 since the timestamp of the
> last
> > file that resulted in a TriggerEvent being emitted. Otherwise, a sort
> > of infinite
> > loop may be created
> > <
> >
> https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling
> > >
> > .
> >
> > Since there is not a supported way to reliably store state in a Trigger,
> > this “watermark” cannot be persisted and retrieved by other Trigger runs.
> > *This
> > makes creating a Trigger for incremental events quite difficult.*
> >
> >
> > Previous Work
> >
> >
> > Daniel Standish created AIP-30
> > <
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence
> > >
> > to
> > address this issue. The use-cases that he mentions in this AIP closely
> > align with the problems that Airflow users will try to solve with
> > event-driven scheduling. This was not implemented, but remains relevant.
> >
> > Airflow users and contributors have also started to implement their own
> > “workarounds” to power event-driven scheduling. However, there has not
> been
> > a robust solution implemented yet.
> >
> >
> > Proposed Solution
> >
> >
> > Prior to the proposed solution below, I tested and observed different
> > approaches from the community. The below were considered before being
> > rejected.
> >
> >    1.
> >
> >    Using Airflow Variables to set and retrieve key-value pairs.
> > Unfortunately,
> >    Variable.set(...) cannot be run by code on the Triggerer. This
> approach
> >    also had other downsides, such as the risk of unintentionally
> > modifications
> >    to a Variable.
> >    2.
> >
> >    Storing key-value pairs in some external store. This would require
> >    additional set up for Airflow users, and would make event-driven
> > scheduling
> >    feel less “native”.
> >    3.
> >
> >    Using the /assets endpoint to find the last_asset_event.timestamp.
> Since
> >    a Trigger is defined independent of an Asset, there is no
> > straightforward
> >    way to retrieve this timestamp. Plus, this was not a flexible approach
> > for
> >    non-temporal watermarks.
> >
> >
> > Heavily borrowing from the ideas in AIP-30, I’m proposing that a new
> model
> > is added to Airflow. This model would be called process_state (or
> something
> > similar) and have the fields namespace, name, key, value. A process
> > namespace and name would be provided when defining a Trigger, allowing
> for
> > a key-value pair to be stored and retrieved within a Trigger itself. The
> > process_state model could also be used by Sensors implementing
> Trigger-like
> > logic.
> >
> > For nearly all “incremental” processes, this provides a way to maintain
> > state and unblock the development of event Triggers.
> >
> > If the community decides to move forward with this approach, I’d like to
> > spearhead the effort w.r.t. the development, testing, and documentation
> of
> > this functionality.
> >
> > *Jake Roach*
> > Field Data Engineer
> > (he/him/his)
> >
> >
> > Email: jake.ro...@astronomer.io
> >
> > Mobile: ( <555-555-5555>716) 364-4203
> >
>

Reply via email to