*Problem*

Currently, there is no way to persist state within a Trigger, or within any
Airflow object, for that matter.

This presents a challenge to Airflow users, especially those looking to
“watch” Assets. AssetWatcher’s leverage Triggers (inheriting from the
BaseEventTrigger) to create Asset events and trigger DAG execution for
event-driven DAGs. So far, the only implementations have been
queue/stream-based. However, there are a number of other “incremental”
events that users may want to invoke DAGs upon, such as:


   -

   New files land or are updated in S3
   -

   New records are added to a Postgres table.


In order to implement this type of logic in a trigger, a sort of
“watermark” is required. This could be in the form of a “last file
processed” or “last modified time”. For example, a Trigger should only
“look back” at files that have landed in S3 since the timestamp of the last
file that resulted in a TriggerEvent being emitted. Otherwise, a sort
of infinite
loop may be created
<https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/event-scheduling.html#avoid-infinite-scheduling>
.

Since there is not a supported way to reliably store state in a Trigger,
this “watermark” cannot be persisted and retrieved by other Trigger runs. *This
makes creating a Trigger for incremental events quite difficult.*


Previous Work


Daniel Standish created AIP-30
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence>
to
address this issue. The use-cases that he mentions in this AIP closely
align with the problems that Airflow users will try to solve with
event-driven scheduling. This was not implemented, but remains relevant.

Airflow users and contributors have also started to implement their own
“workarounds” to power event-driven scheduling. However, there has not been
a robust solution implemented yet.


Proposed Solution


Prior to the proposed solution below, I tested and observed different
approaches from the community. The below were considered before being
rejected.

   1.

   Using Airflow Variables to set and retrieve key-value pairs. Unfortunately,
   Variable.set(...) cannot be run by code on the Triggerer. This approach
   also had other downsides, such as the risk of unintentionally modifications
   to a Variable.
   2.

   Storing key-value pairs in some external store. This would require
   additional set up for Airflow users, and would make event-driven scheduling
   feel less “native”.
   3.

   Using the /assets endpoint to find the last_asset_event.timestamp. Since
   a Trigger is defined independent of an Asset, there is no straightforward
   way to retrieve this timestamp. Plus, this was not a flexible approach for
   non-temporal watermarks.


Heavily borrowing from the ideas in AIP-30, I’m proposing that a new model
is added to Airflow. This model would be called process_state (or something
similar) and have the fields namespace, name, key, value. A process
namespace and name would be provided when defining a Trigger, allowing for
a key-value pair to be stored and retrieved within a Trigger itself. The
process_state model could also be used by Sensors implementing Trigger-like
logic.

For nearly all “incremental” processes, this provides a way to maintain
state and unblock the development of event Triggers.

If the community decides to move forward with this approach, I’d like to
spearhead the effort w.r.t. the development, testing, and documentation of
this functionality.

*Jake Roach*
Field Data Engineer
(he/him/his)


Email: jake.ro...@astronomer.io

Mobile: ( <555-555-5555>716) 364-4203

Reply via email to