+1 non-binding
On 2022/06/01 16:34:13 Ash Berlin-Taylor wrote: > Hi All, > > Now that Summit is over (well done all the speakers! The talks I've > caught so far have been great) I'm ready to push forward with Data > Driven Scheduling, and I would like to call for a vote on > <https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling> > > The vote for last for 7 days, until 2022/06/07 at 16:30 UTC. > > (This is my +1 vote) > > I have just published updates to the AIP, hopefully to make the AIP > tighter in scope (and easier to implement too). The tl;dr of this AIP: > > - Add a concept of Dataset (which is a uri-parsable str. Airflow places > no meaning on what the URI contains/means/is - "airflow:" scheme is > reserved) > - A task "produces" a dataset by a) Having it in it's outlets > attribute, and b) finishing with SUCCESS. (That is, Airflow doesn't > know/care about data transfer/SQL tables etc. It is just conceptually) > - A DAG says that it wants to be triggered when it's dataset (or any of > it's datasets) change. When this happens the scheduler will create the > dag run. > > This is just a high level summary, please read the confluence page for > full details. > > We have already thought about lots of ways we can (and will) extend > this in the over time, detailed in the "Future work" section. Our goal > with this AIP is to build the kernel of Data-aware Scheduling that we > can build on over time. > > A teaser/example DAG that hopefully gives a clue as to what we are > talking about here: > > ``` > import pandas as pd > > from airflowimport dag, Dataset > > > dataset= Dataset("s3://s3_default@some_bucket/order_data") > @dag > def my_dag(): > > @dag.task(outlets=[dataset]) > def producer(): > # What this task actually does doesn't matter to Airflow, the > simple act of running to SUCCESS means the dataset > # is updated, and downstream dags will get triggered > ... > > > > dataset= Dataset("s3://s3_default@some_bucket/order_data") > @dag(schedule_on=dataset) > def consuming_dag(): > @dag.task > def consumer(uri): > df= pandas.read_from_s3(uri) > print(f" Dataset had {df.count()} rows") > > consumer(df=ref.uri) > ``` > > If anyone has any changes you think are fundamental/foundational to the > core idea you have 1 week to raise it :) (Names of parameters we can > easily change as we implement this) Our desire is to get this written > and released Airflow 2.4. > > Thanks, > Ash > > >