*Problem*
Currently, there is no way to persist state within a Trigger, or within any Airflow object, for that matter. This presents a challenge to Airflow users, especially those looking to “watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the BaseEventTrigger) to create Asset events and trigger DAG execution for event-driven DAGs. So far, the only implementations have been queue/stream-based. However, there are a number of other “incremental” events that users may want to invoke DAGs upon, such as: - New files land or are updated in S3 - New records are added to a Postgres table. In order to implement this type of logic in a trigger, a sort of “watermark” is required. This could be in the form of a “last file processed” or “last modified time”. For example, a Trigger should only “look back” at files that have landed in S3 since the timestamp of the last file that resulted in a TriggerEvent being emitted. Otherwise, a sort of infinite loop may be created <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling> . Since there is not a supported way to reliably store state in a Trigger, this “watermark” cannot be persisted and retrieved by other Trigger runs. *This makes creating a Trigger for incremental events quite difficult.* Previous Work Daniel Standish created AIP-30 <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence> to address this issue. The use-cases that he mentions in this AIP closely align with the problems that Airflow users will try to solve with event-driven scheduling. This was not implemented, but remains relevant. Airflow users and contributors have also started to implement their own “workarounds” to power event-driven scheduling. However, there has not been a robust solution implemented yet. Proposed Solution Prior to the proposed solution below, I tested and observed different approaches from the community. The below were considered before being rejected. 1. Using Airflow Variables to set and retrieve key-value pairs. Unfortunately, Variable.set(...) cannot be run by code on the Triggerer. This approach also had other downsides, such as the risk of unintentionally modifications to a Variable. 2. Storing key-value pairs in some external store. This would require additional set up for Airflow users, and would make event-driven scheduling feel less “native”. 3. Using the /assets endpoint to find the last_asset_event.timestamp. Since a Trigger is defined independent of an Asset, there is no straightforward way to retrieve this timestamp. Plus, this was not a flexible approach for non-temporal watermarks. Heavily borrowing from the ideas in AIP-30, I’m proposing that a new model is added to Airflow. This model would be called process_state (or something similar) and have the fields namespace, name, key, value. A process namespace and name would be provided when defining a Trigger, allowing for a key-value pair to be stored and retrieved within a Trigger itself. The process_state model could also be used by Sensors implementing Trigger-like logic. For nearly all “incremental” processes, this provides a way to maintain state and unblock the development of event Triggers. If the community decides to move forward with this approach, I’d like to spearhead the effort w.r.t. the development, testing, and documentation of this functionality. *Jake Roach* Field Data Engineer (he/him/his) Email: jake.ro...@astronomer.io Mobile: ( <555-555-5555>716) 364-4203