Hi everyone, [DISCLAIMER first: I'm working in the same team as Howie]
Many thanks for the discussion. Very insightful! I have two points to share and would like to hear how you think of them: *# 1. Potential cons of `pre_execute` method* I do agree it's a nice idea and is almost perfect for most Airflow users (who author the workflows by writing DAGs, via writing Python, very flexible). But I see increasing use cases where people create DAGs through templating (including our teams), which makes it a little bit less flexible. I get the point shared by Jarek, "*You can write common function in the shared code that will be used across your teams*". Say we create a common function for this "skip" purpose, what if the user already has another *pre_execute* callable created for other purposes, then she/he will need to modify the callable further. I cannot judge if this is a big or small hassle for other Airflow users, but I think it will be a cool built-in feature if users can skip certain tasks for a DagRun without having to make any change to their DAGs. *# 2. State and Trigger Rule (more important point to consider)* I'm really happy that folks are thinking from the *State* and *Trigger Rule* aspect. That's actually what came to my mind first when we discussed this internally. As James asked, "Skipped" state may cause unexpected side-effects. IMHO The solution Jarek shared (using different trigger_rule, like "all_done") may not thoroughly resolve the concern, especially for a more complicated DAG, for example the one in the image below (if the image doesn't display, please check it at https://postimg.cc/cK837ydy) [image: Untitled.drawio (2).png] We only want to skip task B2 (in ad-hoc manually-triggered runs). Our target includes to retain the dependencies and the whole thing should work like B2 succeeded. No matter we apply "*all_done*" or "*non_failed*" for Task C, it will always work differently from what we may really expect (we may expect Task C to still behave like with default trigger rule "*all_success*") So the skipped task needs to have a State identical to "Success", other than the name. Hence a new state may be needed for this feature Howie is proposing. This is also one of the reasons why the *pre_execute* method may not fully resolve the question, because marking the skipped task as "*Skipped*" may lead to something we don't expect (marking it as "*Success*" is also a bad practice, because it will be a confusion when user checks the history later). Of course I may have missed something or I may be wrong in some of the analysis. Please help point out, and more discussions around this would be greatly appreciated ;-) Many thanks. Regards, XD On Sat, Feb 5, 2022 at 11:03 AM Jarek Potiuk <ja...@potiuk.com> wrote: > Airflow is really a "pythonic" framework. The basic premise of Airflow is > that everything "DAG" is not declarative but Imperative. DAG definition, > Task code, decorated python callables as a "new" way of writing tasks, heck > even triggers and more recently timetables are "python imperative code". > Similarly - Custom Secret, XCom Backends are very little "configurable" - > if you need to change their behaviour, you .... write code rather than (for > example) provide mapping of variables. And that's a deliberate choice - > which is different from many other frameworks. If anything - we take away > "declarative" ways rather than add them. This is the power of Airflow, > rather than adding more "limited" options. we add ways to extend > Airflow capabilities by writing small snippets of code rather than > "declaring" how existing code should behave. > > In this context I thin neither of your cons hold: > > 1) it is fully scalable. You can write common function in the shared code > that will be used across your teams > 2) this is how you usually extend airflow, your admins are already > familiar with extending Airflow with writing small pieces of code, so it > does not cost too much extra > 3) No abuse. This is one of the million ways of using pre-execute and a > very cool one. We've added experimentally "pre_execute" as callable not > that long time ago and the more I see cases like it the more I think it was > a good idea > > So I do not see it as a workaround at all. I see it as a regular "feature" > of Airflow. > > Just my thoughts. > > J. > > > > On Sat, Feb 5, 2022 at 8:35 AM Hongyi Wang <whyni...@gmail.com> wrote: > >> Hi Daniel, >> >> Thank you for sharing the `pre_execute` idea. It is a smart idea, and I >> do believe it would work. But to me, it is more as a walkaround instead of >> an universal solution. To explain, I would love to discuss the tradeoffs. >> >> [Pros] >> The solution I proposed requires change in either Airflow Scheduler or >> Executor. In comparison, the `pre_execute` solution doesn't require any >> change, which avoids introducing additional risk/complexity. >> >> [Cons] >> 1) Not scalable / Inconvenient >> To make a task skippable, one needs to modify existing DAG (to set >> `pre_execute`). It seems not difficult, but when your Airflow host a >> thousand DAGs own by different teams/users, it can be challenging. >> >> 2) May cost additional time and resources >> A task won’t be skipped until it reaches `pre_execute()`. It is okay when >> using LocalExecutor/SequentialExecutor; however, when using >> KubernetesExecutor, a k8s worker pod will always be created for “task to >> skip”. Ideally, tasks are skipped during "scheduling". >> >> 3) May abuse `pre_execute` >> Instead of asking developers to set `pre_execute=skip_if_specified` so >> the task becomes skippable. Wouldn’t it be better if we make it (task is >> skippable) an out-of-box feature, meanwhile `pre_execute` can serve its >> original purpose? >> >> Howie >> >> On Fri, Feb 4, 2022 at 7:41 AM James Coder <jcode...@gmail.com> wrote: >> >>> Thanks Jarek, yeah, I wasn't thinking about the trigger rules, hazards >>> of emailing before coffee. >>> >>> On Fri, Feb 4, 2022 at 10:32 AM Jarek Potiuk <ja...@potiuk.com> wrote: >>> >>>> Thanks Daniel, this something I have been thinking about for a while. >>>>> One use case I have is for a dag only gets run on an ad hoc basis, for any >>>>> combination of a subset of tasks (I recognize this is a rather >>>>> non-standard >>>>> use of airflow). One question though, if you specified skip_list=[“task >>>>> b”] >>>>> wouldn’t the scheduler skip task c as well? >>>>> >>>> >>>> Depending on the triggering rule you choose. >>>> >>>> One of the effects of the default "all_success" rule is that the "skip" >>>> state propagates when the rule is used. >>>> If you use "all_done", it won't be skipped (but failure will also be >>>> counted as success) and when you use "none_failed" it will propagate the >>>> "failure" of B to C as well (but "skip" will not be propagated).. >>>> >>>> Generally speaking when you expect some task to be skippable you should >>>> design your DAG to account for it. >>>> >>>> You can see more about the rules in this really informative post by >>>> Marc Lamberti >>>> https://marclamberti.com/blog/airflow-trigger-rules-all-you-need-to-know/ >>>> >>>> Whenever I can't remember how those rules work, I keep on coming back >>>> to the post. >>>> >>>> >>>> >>>>> James Coder >>>>> ------------------------------ >>>>> *From:* Daniel Standish <daniel.stand...@astronomer.io.INVALID> >>>>> *Sent:* Friday, February 4, 2022 1:41:12 AM >>>>> *To:* dev@airflow.apache.org <dev@airflow.apache.org> >>>>> *Subject:* Re: [DISCUSSION] Specify tasks to skip when triggering DAG >>>>> >>>>> That skip func had a typo (conf where it should have been context)... >>>>> >>>>> this is more likely to work: >>>>> >>>>> def skip_if_specified(context): >>>>> if not context: >>>>> return >>>>> dr = context.get('dag_run') >>>>> ti = context.get('task_instance') >>>>> if not (dr and ti): >>>>> return >>>>> conf = dr.conf >>>>> if not conf: >>>>> return >>>>> skip_list = conf.get('skip_list', []) >>>>> if ti.task_id in skip_list: >>>>> raise AirflowSkipException() >>>>> >>>>> Apologies for the spam. >>>>> >>>>