Hi Daniel,

Thank you for sharing the `pre_execute` idea. It is a smart idea, and I do
believe it would work. But to me, it is more as a walkaround instead of an
universal solution. To explain, I would love to discuss the tradeoffs.

[Pros]
The solution I proposed requires change in either Airflow Scheduler or
Executor. In comparison, the `pre_execute` solution doesn't require any
change, which avoids introducing additional risk/complexity.

[Cons]
1) Not scalable / Inconvenient
To make a task skippable, one needs to modify existing DAG (to set
`pre_execute`). It seems not difficult, but when your Airflow host a
thousand DAGs own by different teams/users, it can be challenging.

2) May cost additional time and resources
A task won’t be skipped until it reaches `pre_execute()`. It is okay when
using LocalExecutor/SequentialExecutor; however, when using
KubernetesExecutor, a k8s worker pod will always be created for “task to
skip”. Ideally, tasks are skipped during "scheduling".

3) May abuse `pre_execute`
Instead of asking developers to set `pre_execute=skip_if_specified` so the
task becomes skippable. Wouldn’t it be better if we make it (task is
skippable) an out-of-box feature, meanwhile `pre_execute` can serve its
original purpose?

Howie

On Fri, Feb 4, 2022 at 7:41 AM James Coder <jcode...@gmail.com> wrote:

> Thanks Jarek, yeah, I wasn't thinking about the trigger rules, hazards of
> emailing before coffee.
>
> On Fri, Feb 4, 2022 at 10:32 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Thanks Daniel, this something I have been thinking about for a while.
>>> One use case I have is for a dag only gets run on an ad hoc basis, for any
>>> combination of a subset of tasks (I recognize this is a rather non-standard
>>> use of airflow). One question though, if you specified skip_list=[“task b”]
>>> wouldn’t the scheduler skip task c as well?
>>>
>>
>> Depending on the triggering rule you choose.
>>
>> One of the effects of the default "all_success" rule is that the "skip"
>> state propagates when the rule is used.
>> If you use "all_done", it won't be skipped (but failure will also be
>> counted as success) and when you use "none_failed" it will propagate the
>> "failure" of B to C as well (but "skip" will not be propagated)..
>>
>> Generally speaking when you expect some task to be skippable you should
>> design your DAG to account for it.
>>
>> You can see more about the rules in this really informative post by Marc
>> Lamberti
>> https://marclamberti.com/blog/airflow-trigger-rules-all-you-need-to-know/
>>
>> Whenever I can't remember how those rules work, I keep on coming back to
>> the post.
>>
>>
>>
>>> James Coder
>>> ------------------------------
>>> *From:* Daniel Standish <daniel.stand...@astronomer.io.INVALID>
>>> *Sent:* Friday, February 4, 2022 1:41:12 AM
>>> *To:* dev@airflow.apache.org <dev@airflow.apache.org>
>>> *Subject:* Re: [DISCUSSION] Specify tasks to skip when triggering DAG
>>>
>>> That skip func had a typo (conf where it should have been context)...
>>>
>>> this is more likely to work:
>>>
>>> def skip_if_specified(context):
>>>     if not context:
>>>         return
>>>     dr = context.get('dag_run')
>>>     ti = context.get('task_instance')
>>>     if not (dr and ti):
>>>         return
>>>     conf = dr.conf
>>>     if not conf:
>>>         return
>>>     skip_list = conf.get('skip_list', [])
>>>     if ti.task_id in skip_list:
>>>         raise AirflowSkipException()
>>>
>>> Apologies for the spam.
>>>
>>

Reply via email to