+1 (binding) On Thu, Jun 2, 2022 at 9:31 AM Hila Sofer Elyashiv <hi...@wix.com.invalid> wrote:
> +1 non-binding > > On 2022/06/01 16:34:13 Ash Berlin-Taylor wrote: > > Hi All, > > > > Now that Summit is over (well done all the speakers! The talks I've > > caught so far have been great) I'm ready to push forward with Data > > Driven Scheduling, and I would like to call for a vote on > > < > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling > > > > > > The vote for last for 7 days, until 2022/06/07 at 16:30 UTC. > > > > (This is my +1 vote) > > > > I have just published updates to the AIP, hopefully to make the AIP > > tighter in scope (and easier to implement too). The tl;dr of this AIP: > > > > - Add a concept of Dataset (which is a uri-parsable str. Airflow places > > no meaning on what the URI contains/means/is - "airflow:" scheme is > > reserved) > > - A task "produces" a dataset by a) Having it in it's outlets > > attribute, and b) finishing with SUCCESS. (That is, Airflow doesn't > > know/care about data transfer/SQL tables etc. It is just conceptually) > > - A DAG says that it wants to be triggered when it's dataset (or any of > > it's datasets) change. When this happens the scheduler will create the > > dag run. > > > > This is just a high level summary, please read the confluence page for > > full details. > > > > We have already thought about lots of ways we can (and will) extend > > this in the over time, detailed in the "Future work" section. Our goal > > with this AIP is to build the kernel of Data-aware Scheduling that we > > can build on over time. > > > > A teaser/example DAG that hopefully gives a clue as to what we are > > talking about here: > > > > ``` > > import pandas as pd > > > > from airflowimport dag, Dataset > > > > > > dataset= Dataset("s3://s3_default@some_bucket/order_data") > > @dag > > def my_dag(): > > > > @dag.task(outlets=[dataset]) > > def producer(): > > # What this task actually does doesn't matter to Airflow, the > > simple act of running to SUCCESS means the dataset > > # is updated, and downstream dags will get triggered > > ... > > > > > > > > dataset= Dataset("s3://s3_default@some_bucket/order_data") > > @dag(schedule_on=dataset) > > def consuming_dag(): > > @dag.task > > def consumer(uri): > > df= pandas.read_from_s3(uri) > > print(f" Dataset had {df.count()} rows") > > > > consumer(df=ref.uri) > > ``` > > > > If anyone has any changes you think are fundamental/foundational to the > > core idea you have 1 week to raise it :) (Names of parameters we can > > easily change as we implement this) Our desire is to get this written > > and released Airflow 2.4. > > > > Thanks, > > Ash > > > > > >