+1 (non-binding) *Ankit Chaurasia* HomePage <https://ankitchaurasia.info/> | LinkedIn <https://www.linkedin.com/in/sunank200/> | +91-9987351649
On Fri, Jun 3, 2022 at 1:04 PM Tomasz Urbaszek <turbas...@apache.org> wrote: > +1 (binding) > > On Thu, 2 Jun 2022 at 15:49, Josh Fell <josh.d.f...@astronomer.io.invalid> > wrote: > >> +1 (binding) >> >> On Thu, Jun 2, 2022 at 9:31 AM Hila Sofer Elyashiv <hi...@wix.com.invalid> >> wrote: >> >>> +1 non-binding >>> >>> On 2022/06/01 16:34:13 Ash Berlin-Taylor wrote: >>> > Hi All, >>> > >>> > Now that Summit is over (well done all the speakers! The talks I've >>> > caught so far have been great) I'm ready to push forward with Data >>> > Driven Scheduling, and I would like to call for a vote on >>> > < >>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling >>> > >>> > >>> > The vote for last for 7 days, until 2022/06/07 at 16:30 UTC. >>> > >>> > (This is my +1 vote) >>> > >>> > I have just published updates to the AIP, hopefully to make the AIP >>> > tighter in scope (and easier to implement too). The tl;dr of this AIP: >>> > >>> > - Add a concept of Dataset (which is a uri-parsable str. Airflow >>> places >>> > no meaning on what the URI contains/means/is - "airflow:" scheme is >>> > reserved) >>> > - A task "produces" a dataset by a) Having it in it's outlets >>> > attribute, and b) finishing with SUCCESS. (That is, Airflow doesn't >>> > know/care about data transfer/SQL tables etc. It is just conceptually) >>> > - A DAG says that it wants to be triggered when it's dataset (or any >>> of >>> > it's datasets) change. When this happens the scheduler will create the >>> > dag run. >>> > >>> > This is just a high level summary, please read the confluence page for >>> > full details. >>> > >>> > We have already thought about lots of ways we can (and will) extend >>> > this in the over time, detailed in the "Future work" section. Our goal >>> > with this AIP is to build the kernel of Data-aware Scheduling that we >>> > can build on over time. >>> > >>> > A teaser/example DAG that hopefully gives a clue as to what we are >>> > talking about here: >>> > >>> > ``` >>> > import pandas as pd >>> > >>> > from airflowimport dag, Dataset >>> > >>> > >>> > dataset= Dataset("s3://s3_default@some_bucket/order_data") >>> > @dag >>> > def my_dag(): >>> > >>> > @dag.task(outlets=[dataset]) >>> > def producer(): >>> > # What this task actually does doesn't matter to Airflow, the >>> > simple act of running to SUCCESS means the dataset >>> > # is updated, and downstream dags will get triggered >>> > ... >>> > >>> > >>> > >>> > dataset= Dataset("s3://s3_default@some_bucket/order_data") >>> > @dag(schedule_on=dataset) >>> > def consuming_dag(): >>> > @dag.task >>> > def consumer(uri): >>> > df= pandas.read_from_s3(uri) >>> > print(f" Dataset had {df.count()} rows") >>> > >>> > consumer(df=ref.uri) >>> > ``` >>> > >>> > If anyone has any changes you think are fundamental/foundational to >>> the >>> > core idea you have 1 week to raise it :) (Names of parameters we can >>> > easily change as we implement this) Our desire is to get this written >>> > and released Airflow 2.4. >>> > >>> > Thanks, >>> > Ash >>> > >>> > >>> > >> >>