Airflow is really a "pythonic" framework. The basic premise of Airflow is
that everything "DAG" is not declarative but Imperative. DAG definition,
Task code, decorated python callables as a "new" way of writing tasks, heck
even triggers and more recently timetables are "python imperative code".
Similarly - Custom Secret, XCom Backends are very little "configurable" -
if you need to change their behaviour, you .... write code rather than (for
example) provide mapping of variables. And that's a deliberate choice -
which is different from many other frameworks. If anything - we take away
"declarative" ways rather than add them. This is the power of Airflow,
rather than adding more "limited" options. we add ways to extend
Airflow capabilities by writing small snippets of code rather than
"declaring" how existing code should behave.

In this context I thin neither of your cons hold:

1) it is fully scalable. You can write common function in the shared code
that will be used across your teams
2) this is how you usually extend airflow, your admins are already familiar
with extending Airflow with writing small pieces of code, so it does not
cost too much extra
3) No abuse. This is one of the million ways of using pre-execute and a
very cool one. We've added experimentally "pre_execute" as callable not
that long time ago and the more I see cases like it the more I think it was
a good idea

So I do not see it as a workaround at all. I see it as a regular "feature"
of Airflow.

Just my thoughts.

J.



On Sat, Feb 5, 2022 at 8:35 AM Hongyi Wang <whyni...@gmail.com> wrote:

> Hi Daniel,
>
> Thank you for sharing the `pre_execute` idea. It is a smart idea, and I do
> believe it would work. But to me, it is more as a walkaround instead of an
> universal solution. To explain, I would love to discuss the tradeoffs.
>
> [Pros]
> The solution I proposed requires change in either Airflow Scheduler or
> Executor. In comparison, the `pre_execute` solution doesn't require any
> change, which avoids introducing additional risk/complexity.
>
> [Cons]
> 1) Not scalable / Inconvenient
> To make a task skippable, one needs to modify existing DAG (to set
> `pre_execute`). It seems not difficult, but when your Airflow host a
> thousand DAGs own by different teams/users, it can be challenging.
>
> 2) May cost additional time and resources
> A task won’t be skipped until it reaches `pre_execute()`. It is okay when
> using LocalExecutor/SequentialExecutor; however, when using
> KubernetesExecutor, a k8s worker pod will always be created for “task to
> skip”. Ideally, tasks are skipped during "scheduling".
>
> 3) May abuse `pre_execute`
> Instead of asking developers to set `pre_execute=skip_if_specified` so the
> task becomes skippable. Wouldn’t it be better if we make it (task is
> skippable) an out-of-box feature, meanwhile `pre_execute` can serve its
> original purpose?
>
> Howie
>
> On Fri, Feb 4, 2022 at 7:41 AM James Coder <jcode...@gmail.com> wrote:
>
>> Thanks Jarek, yeah, I wasn't thinking about the trigger rules, hazards of
>> emailing before coffee.
>>
>> On Fri, Feb 4, 2022 at 10:32 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>>> Thanks Daniel, this something I have been thinking about for a while.
>>>> One use case I have is for a dag only gets run on an ad hoc basis, for any
>>>> combination of a subset of tasks (I recognize this is a rather non-standard
>>>> use of airflow). One question though, if you specified skip_list=[“task b”]
>>>> wouldn’t the scheduler skip task c as well?
>>>>
>>>
>>> Depending on the triggering rule you choose.
>>>
>>> One of the effects of the default "all_success" rule is that the "skip"
>>> state propagates when the rule is used.
>>> If you use "all_done", it won't be skipped (but failure will also be
>>> counted as success) and when you use "none_failed" it will propagate the
>>> "failure" of B to C as well (but "skip" will not be propagated)..
>>>
>>> Generally speaking when you expect some task to be skippable you should
>>> design your DAG to account for it.
>>>
>>> You can see more about the rules in this really informative post by Marc
>>> Lamberti
>>> https://marclamberti.com/blog/airflow-trigger-rules-all-you-need-to-know/
>>>
>>> Whenever I can't remember how those rules work, I keep on coming back to
>>> the post.
>>>
>>>
>>>
>>>> James Coder
>>>> ------------------------------
>>>> *From:* Daniel Standish <daniel.stand...@astronomer.io.INVALID>
>>>> *Sent:* Friday, February 4, 2022 1:41:12 AM
>>>> *To:* dev@airflow.apache.org <dev@airflow.apache.org>
>>>> *Subject:* Re: [DISCUSSION] Specify tasks to skip when triggering DAG
>>>>
>>>> That skip func had a typo (conf where it should have been context)...
>>>>
>>>> this is more likely to work:
>>>>
>>>> def skip_if_specified(context):
>>>>     if not context:
>>>>         return
>>>>     dr = context.get('dag_run')
>>>>     ti = context.get('task_instance')
>>>>     if not (dr and ti):
>>>>         return
>>>>     conf = dr.conf
>>>>     if not conf:
>>>>         return
>>>>     skip_list = conf.get('skip_list', [])
>>>>     if ti.task_id in skip_list:
>>>>         raise AirflowSkipException()
>>>>
>>>> Apologies for the spam.
>>>>
>>>

Reply via email to