+1 (binding) On Thu, 2 Jun 2022 at 15:49, Josh Fell <josh.d.f...@astronomer.io.invalid> wrote:
> +1 (binding) > > On Thu, Jun 2, 2022 at 9:31 AM Hila Sofer Elyashiv <hi...@wix.com.invalid> > wrote: > >> +1 non-binding >> >> On 2022/06/01 16:34:13 Ash Berlin-Taylor wrote: >> > Hi All, >> > >> > Now that Summit is over (well done all the speakers! The talks I've >> > caught so far have been great) I'm ready to push forward with Data >> > Driven Scheduling, and I would like to call for a vote on >> > < >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling >> > >> > >> > The vote for last for 7 days, until 2022/06/07 at 16:30 UTC. >> > >> > (This is my +1 vote) >> > >> > I have just published updates to the AIP, hopefully to make the AIP >> > tighter in scope (and easier to implement too). The tl;dr of this AIP: >> > >> > - Add a concept of Dataset (which is a uri-parsable str. Airflow places >> > no meaning on what the URI contains/means/is - "airflow:" scheme is >> > reserved) >> > - A task "produces" a dataset by a) Having it in it's outlets >> > attribute, and b) finishing with SUCCESS. (That is, Airflow doesn't >> > know/care about data transfer/SQL tables etc. It is just conceptually) >> > - A DAG says that it wants to be triggered when it's dataset (or any of >> > it's datasets) change. When this happens the scheduler will create the >> > dag run. >> > >> > This is just a high level summary, please read the confluence page for >> > full details. >> > >> > We have already thought about lots of ways we can (and will) extend >> > this in the over time, detailed in the "Future work" section. Our goal >> > with this AIP is to build the kernel of Data-aware Scheduling that we >> > can build on over time. >> > >> > A teaser/example DAG that hopefully gives a clue as to what we are >> > talking about here: >> > >> > ``` >> > import pandas as pd >> > >> > from airflowimport dag, Dataset >> > >> > >> > dataset= Dataset("s3://s3_default@some_bucket/order_data") >> > @dag >> > def my_dag(): >> > >> > @dag.task(outlets=[dataset]) >> > def producer(): >> > # What this task actually does doesn't matter to Airflow, the >> > simple act of running to SUCCESS means the dataset >> > # is updated, and downstream dags will get triggered >> > ... >> > >> > >> > >> > dataset= Dataset("s3://s3_default@some_bucket/order_data") >> > @dag(schedule_on=dataset) >> > def consuming_dag(): >> > @dag.task >> > def consumer(uri): >> > df= pandas.read_from_s3(uri) >> > print(f" Dataset had {df.count()} rows") >> > >> > consumer(df=ref.uri) >> > ``` >> > >> > If anyone has any changes you think are fundamental/foundational to the >> > core idea you have 1 week to raise it :) (Names of parameters we can >> > easily change as we implement this) Our desire is to get this written >> > and released Airflow 2.4. >> > >> > Thanks, >> > Ash >> > >> > >> > > >