And if not with cluster policy, then you could pass such a "conditionally
skip" callable to all tasks in a dag with default_args

On Thu, Feb 3, 2022, 10:16 PM Daniel Standish <daniel.stand...@astronomer.io>
wrote:

> It wouldn't necessarily need to involve the scheduler or the executor.
> You could add logic to pre_execute to read dag run conf and skip self is
> specified in the conf.  In fact you could probably implement this currently
> with this approach using cluster policy, since we can supply pre_execute
> callables through param in base operator.
>
> On Thu, Feb 3, 2022, 10:07 PM Hongyi Wang <whyni...@gmail.com> wrote:
>
>> Hello everyone,
>>
>> Just want to clarify a few points regarding "specify tasks to skip when
>> triggering DAG".
>>
>> First of all, the use case is not necessarily only backfill. We may want
>> to have this for manual triggers so users can do it easily in UI, rather
>> than using backfill CLI. This is similar to how Azkaban users skip a job
>> during execution (
>> https://azkaban.github.io/azkaban/docs/latest/#executing-flows).
>>
>> Second, we DO want to retain the dependencies and that is a “hard”
>> requirement (as is shown in the task-a ☐ -> task-b ☑ -> task-c ☐ example).
>>
>> Last but not least, the change does NOT necessarily have to be in
>> Scheduler, there is a chance it can be done in Executor instead (if that's
>> preferred).
>>
>> @Ash any objection to this proposal? If so, may you please share your
>> concern?
>>
>> Look forward to reaching a consensus soon.
>>
>> Howie
>>
>> On Mon, Jan 31, 2022 at 5:44 PM Hongyi Wang <whyni...@gmail.com> wrote:
>>
>>> Hi Ash, thanks for the feedback.
>>>
>>> > You want it to behave as if the dag was specified as task-a -> task-c,
>>> right?
>>>
>>> Yes. As you mentioned, when we mark task-b as skipped, task-c's
>>> dependencies would be resolved and it would be eligible to run. BUT it does
>>> not necessarily mean task-a and task-c may run in parallel. The key is, if
>>> we only attempt to skip a task instance when it is eligible to run (e.g.
>>> task-b is eligible to run after task-a completes), we can still retain
>>> dependencies between the rest of tasks.
>>>
>>> > I'm not sold that this should belong in Airflow scheduler --
>>> re-running DagRuns on an ad-hoc basis is more aligned with `airflow
>>> backfill`.
>>>
>>> Are you suggesting "skipping tasks when triggering DAG" should be an
>>> option specific to `airflow backfill`? (please correct me if I
>>> misunderstand) To me, the proposal is more aligned with `airflow trigger`
>>> than `airflow backfill`, but I hear you. The reason why I believe
>>> `scheduler_job.py` is the best place to skip tasks is because it allows
>>> Airflow to retain dependencies between non-skipped tasks (as I explained
>>> above). I understand changes to the Airflow scheduler may impact all users.
>>> I won't deny there is a risk. But considering the code change is
>>> comparatively small and straightforward, I am not too worried once it has
>>> been reviewed and tested.
>>>
>>> Hope I answered your question. I am happy to discuss more.
>>>
>>> Howie
>>>
>>> On Mon, Jan 31, 2022 at 4:03 AM Ash Berlin-Taylor <a...@apache.org>
>>> wrote:
>>>
>>>> > To illustrate the use case, I am going to use this example below.
>>>> > task-a ☐ -> task-b ☑ -> task-c ☐
>>>>
>>>> So in this example, are you aware and intending that both task-a and
>>>> task-c run straight away? Because by skipping task-b, task-c's dependencies
>>>> would be resolved and it would be eligible to run.
>>>>
>>>> From what you've describe I don't think that is actually what you want,
>>>> but that you want it to behave as if the dag was specified as task-a ->
>>>> task-c, right?
>>>>
>>>> Honestly though: I'm not sold that this should belong in Airflow
>>>> scheduler -- re-running DagRuns on an ad-hoc basis is more aligned with
>>>> `airflow backfill`.
>>>>
>>>> -ash
>>>>
>>>> On Sun, Jan 30 2022 at 11:21:26 -0800, Hongyi Wang <whyni...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Elad, thank you for your feedback. To answer your question, besides
>>>> debugging, another common use case is re-running existing DAGs in an ad-hoc
>>>> manner.
>>>>
>>>> For example, as an Airflow user, I sometimes want to trigger an ad-hoc
>>>> DAG run. In this run, I want to skip one/more tasks, so the dag run can
>>>> yield a different result, or simply complete sooner. As I mentioned in my
>>>> previous email, there are other ways to achieve the same goal. But IMHO,
>>>> neither of them are easy & flexible enough for an ad-hoc use case.
>>>>
>>>> Does that sound like a reasonable use case? What do you think is the
>>>> best approach to solve it? I am happy to discuss more with you.
>>>>
>>>> On Sun, Jan 30, 2022 at 4:45 AM Elad Kalif <elad...@apache.org> wrote:
>>>>
>>>>> Can you describe a use case for the requested feature other than
>>>>> debugging? This doesn't feel like the right approach to test a specific
>>>>> task in a pipeline.
>>>>>
>>>>> On Fri, Jan 28, 2022 at 11:44 PM Alex Begg <alex.b...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Actually, sorry, you can scratch out some of what I just said, I
>>>>>> thought you were talking about clearing states, you are instead referring
>>>>>> to triggering a DAG run. That does kind of make sense to have a way to
>>>>>> trigger a DAG run but only run specific tasks.
>>>>>>
>>>>>> On Fri, Jan 28, 2022 at 1:41 PM Alex Begg <alex.b...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I believe this is currently possible by just unselecting
>>>>>>> “downstream” before you click “Clear” in the UI. It should only clear 
>>>>>>> the
>>>>>>> one middle task and not the downstream task(s).
>>>>>>>
>>>>>>> I would prefer to not have a more detailed UI to allow to skip (or i
>>>>>>> want to say “bypass” as “skip” is itself a task state) specific 
>>>>>>> downstream
>>>>>>> tasks as it might signal to users that it is ideal to specify tasks to
>>>>>>> bypass when in reality it is only something that should be done on 
>>>>>>> occasion
>>>>>>> for experiment or troubleshooting as you mention, not a common 
>>>>>>> occurrence.
>>>>>>>
>>>>>>> What I can agree to though is the list of buttons on the dialog
>>>>>>> window to change state of a task is a bit cluttered looking. There 
>>>>>>> probably
>>>>>>> can be a better UI/UX for that, but I do think being able to 
>>>>>>> check/uncheck
>>>>>>> downstream task is a way to go, that seems like it will be just as
>>>>>>> cluttered.
>>>>>>>
>>>>>>> Alex Begg
>>>>>>>
>>>>>>> On Fri, Jan 28, 2022 at 11:46 AM Hongyi Wang <whyni...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello everyone,
>>>>>>>>
>>>>>>>> I'd like to propose a new feature in Airflow -- allow users to specify 
>>>>>>>> tasks to skip when trigger DAG run.
>>>>>>>>
>>>>>>>> From our own experience, this feature can be very useful when doing 
>>>>>>>> experiments, troubleshooting or re-running existing DAGs. And I 
>>>>>>>> believe it can benefit many Airflow users.
>>>>>>>>
>>>>>>>> To illustrate the use case, I am going to use this example below.
>>>>>>>> task-a ☐ -> task-b ☑ -> task-c ☐
>>>>>>>>
>>>>>>>> Suppose we have a DAG containing 3 tasks. To troubleshoot "task-a" and 
>>>>>>>> "task-c", I want to trigger a manual DAG run and skip "task-b" (so I 
>>>>>>>> can save time & resource & focus on other two tasks). To do so, today 
>>>>>>>> I have two options:
>>>>>>>>
>>>>>>>> Option 1: Trigger DAG, then manually mark "task-b" as `SUCCESS`
>>>>>>>> Option 2: Remove "task-b" from my DAG, then trigger DAG
>>>>>>>>
>>>>>>>> Neither of the options are great. Option 1 can be troublesome when DAG 
>>>>>>>> is large, and there are multiple tasks I want to skip. Option 2 
>>>>>>>> requires change in the DAG file, which is not convenient for just 
>>>>>>>> troubleshooting.
>>>>>>>>
>>>>>>>> Therefore, I would love to discuss how we can provide an easy way for 
>>>>>>>> users to skip tasks when triggering DAG.
>>>>>>>>
>>>>>>>> Things to consider are:
>>>>>>>> 1) We should allow user to specify all tasks to skip at once when 
>>>>>>>> trigger DAG
>>>>>>>> 2) We should retain the dependencies between non-skip tasks (in above 
>>>>>>>> example, "task-c" won't start until "task-a" completes even if we 
>>>>>>>> skipped "task-b")
>>>>>>>> 3) We should mark skipped task as `SKIPPED` instead of `SUCCESS` to 
>>>>>>>> make it more intuitive
>>>>>>>> 4) The implementation should be easy, clean and low risk
>>>>>>>>
>>>>>>>> Here is my proposed solution (tested locally):
>>>>>>>> Today, Airflow allow user to pass a JSON to the Dagrun as 
>>>>>>>> {{dag_run.conf}} when triggering DAG. The idea is, before queuing task 
>>>>>>>> instances that satisfies dependences, `scheduler_job.py` (after we 
>>>>>>>> make some change) will filter task instances to skip based on 
>>>>>>>> `dag_run.conf` user passes in (e.g. {"skip_tasks": ["task-b"]}), then 
>>>>>>>> mark them as SKIPPED.
>>>>>>>>
>>>>>>>> Things I would love to discuss:
>>>>>>>> - What do you think about this feature?
>>>>>>>> - What do you think about the proposed solution?
>>>>>>>> - Did I miss anything that you want to discuss?
>>>>>>>> - Is it necessary to introduce a new state (e.g. MANUAL_SKIPPED) to 
>>>>>>>> differentiate SKIPPED?
>>>>>>>>
>>>>>>>> Howie
>>>>>>>>
>>>>>>>>

Reply via email to