Hi everyone,

[DISCLAIMER first: I'm working in the same team as Howie]

Many thanks for the discussion. Very insightful!

I have two points to share and would like to hear how you think of them:


*# 1. Potential cons of  `pre_execute` method*
I do agree it's a nice idea and is almost perfect for most Airflow users
(who author the workflows by writing DAGs, via writing Python, very
flexible). But I see increasing use cases where people create DAGs through
templating (including our teams), which makes it a little bit less flexible.

I get the point shared by Jarek, "*You can write common function in the
shared code that will be used across your teams*". Say we create a common
function for this "skip" purpose, what if the user already has another
*pre_execute* callable created for other purposes, then she/he will need to
modify the callable further. I cannot judge if this is a big or small
hassle for other Airflow users, but I think it will be a cool built-in
feature if users can skip certain tasks for a DagRun without having to make
any change to their DAGs.

*# 2. State and Trigger Rule (more important point to consider)*

I'm really happy that folks are thinking from the *State* and *Trigger Rule*
aspect. That's actually what came to my mind first when we discussed this
internally.

As James asked, "Skipped" state may cause unexpected side-effects. IMHO The
solution Jarek shared (using different trigger_rule, like "all_done") may
not thoroughly resolve the concern, especially for a more complicated DAG,
for example the one in the image below (if the image doesn't display,
please check it at https://postimg.cc/cK837ydy)

[image: Untitled.drawio (2).png]

We only want to skip task B2 (in ad-hoc manually-triggered runs). Our
target includes to retain the dependencies and the whole thing should work
like B2 succeeded.
No matter we apply "*all_done*" or "*non_failed*" for Task C, it will
always work differently from what we may really expect (we may expect Task
C to still behave like with default trigger rule "*all_success*")

So the skipped task needs to have a State identical to "Success", other
than the name. Hence a new state may be needed for this feature Howie is
proposing.

This is also one of the reasons why the *pre_execute* method may not fully
resolve the question, because marking the skipped task as "*Skipped*" may
lead to something we don't expect (marking it as "*Success*" is also a bad
practice, because it will be a confusion when user checks the history
later).


Of course I may have missed something or I may be wrong in some of the
analysis. Please help point out, and more discussions around this would be
greatly appreciated ;-) Many thanks.


Regards,
XD





On Sat, Feb 5, 2022 at 11:03 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> Airflow is really a "pythonic" framework. The basic premise of Airflow is
> that everything "DAG" is not declarative but Imperative. DAG definition,
> Task code, decorated python callables as a "new" way of writing tasks, heck
> even triggers and more recently timetables are "python imperative code".
> Similarly - Custom Secret, XCom Backends are very little "configurable" -
> if you need to change their behaviour, you .... write code rather than (for
> example) provide mapping of variables. And that's a deliberate choice -
> which is different from many other frameworks. If anything - we take away
> "declarative" ways rather than add them. This is the power of Airflow,
> rather than adding more "limited" options. we add ways to extend
> Airflow capabilities by writing small snippets of code rather than
> "declaring" how existing code should behave.
>
> In this context I thin neither of your cons hold:
>
> 1) it is fully scalable. You can write common function in the shared code
> that will be used across your teams
> 2) this is how you usually extend airflow, your admins are already
> familiar with extending Airflow with writing small pieces of code, so it
> does not cost too much extra
> 3) No abuse. This is one of the million ways of using pre-execute and a
> very cool one. We've added experimentally "pre_execute" as callable not
> that long time ago and the more I see cases like it the more I think it was
> a good idea
>
> So I do not see it as a workaround at all. I see it as a regular "feature"
> of Airflow.
>
> Just my thoughts.
>
> J.
>
>
>
> On Sat, Feb 5, 2022 at 8:35 AM Hongyi Wang <whyni...@gmail.com> wrote:
>
>> Hi Daniel,
>>
>> Thank you for sharing the `pre_execute` idea. It is a smart idea, and I
>> do believe it would work. But to me, it is more as a walkaround instead of
>> an universal solution. To explain, I would love to discuss the tradeoffs.
>>
>> [Pros]
>> The solution I proposed requires change in either Airflow Scheduler or
>> Executor. In comparison, the `pre_execute` solution doesn't require any
>> change, which avoids introducing additional risk/complexity.
>>
>> [Cons]
>> 1) Not scalable / Inconvenient
>> To make a task skippable, one needs to modify existing DAG (to set
>> `pre_execute`). It seems not difficult, but when your Airflow host a
>> thousand DAGs own by different teams/users, it can be challenging.
>>
>> 2) May cost additional time and resources
>> A task won’t be skipped until it reaches `pre_execute()`. It is okay when
>> using LocalExecutor/SequentialExecutor; however, when using
>> KubernetesExecutor, a k8s worker pod will always be created for “task to
>> skip”. Ideally, tasks are skipped during "scheduling".
>>
>> 3) May abuse `pre_execute`
>> Instead of asking developers to set `pre_execute=skip_if_specified` so
>> the task becomes skippable. Wouldn’t it be better if we make it (task is
>> skippable) an out-of-box feature, meanwhile `pre_execute` can serve its
>> original purpose?
>>
>> Howie
>>
>> On Fri, Feb 4, 2022 at 7:41 AM James Coder <jcode...@gmail.com> wrote:
>>
>>> Thanks Jarek, yeah, I wasn't thinking about the trigger rules, hazards
>>> of emailing before coffee.
>>>
>>> On Fri, Feb 4, 2022 at 10:32 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>>> Thanks Daniel, this something I have been thinking about for a while.
>>>>> One use case I have is for a dag only gets run on an ad hoc basis, for any
>>>>> combination of a subset of tasks (I recognize this is a rather 
>>>>> non-standard
>>>>> use of airflow). One question though, if you specified skip_list=[“task 
>>>>> b”]
>>>>> wouldn’t the scheduler skip task c as well?
>>>>>
>>>>
>>>> Depending on the triggering rule you choose.
>>>>
>>>> One of the effects of the default "all_success" rule is that the "skip"
>>>> state propagates when the rule is used.
>>>> If you use "all_done", it won't be skipped (but failure will also be
>>>> counted as success) and when you use "none_failed" it will propagate the
>>>> "failure" of B to C as well (but "skip" will not be propagated)..
>>>>
>>>> Generally speaking when you expect some task to be skippable you should
>>>> design your DAG to account for it.
>>>>
>>>> You can see more about the rules in this really informative post by
>>>> Marc Lamberti
>>>> https://marclamberti.com/blog/airflow-trigger-rules-all-you-need-to-know/
>>>>
>>>> Whenever I can't remember how those rules work, I keep on coming back
>>>> to the post.
>>>>
>>>>
>>>>
>>>>> James Coder
>>>>> ------------------------------
>>>>> *From:* Daniel Standish <daniel.stand...@astronomer.io.INVALID>
>>>>> *Sent:* Friday, February 4, 2022 1:41:12 AM
>>>>> *To:* dev@airflow.apache.org <dev@airflow.apache.org>
>>>>> *Subject:* Re: [DISCUSSION] Specify tasks to skip when triggering DAG
>>>>>
>>>>> That skip func had a typo (conf where it should have been context)...
>>>>>
>>>>> this is more likely to work:
>>>>>
>>>>> def skip_if_specified(context):
>>>>>     if not context:
>>>>>         return
>>>>>     dr = context.get('dag_run')
>>>>>     ti = context.get('task_instance')
>>>>>     if not (dr and ti):
>>>>>         return
>>>>>     conf = dr.conf
>>>>>     if not conf:
>>>>>         return
>>>>>     skip_list = conf.get('skip_list', [])
>>>>>     if ti.task_id in skip_list:
>>>>>         raise AirflowSkipException()
>>>>>
>>>>> Apologies for the spam.
>>>>>
>>>>

Reply via email to