It wouldn't necessarily need to involve the scheduler or the executor. You could add logic to pre_execute to read dag run conf and skip self is specified in the conf. In fact you could probably implement this currently with this approach using cluster policy, since we can supply pre_execute callables through param in base operator.
On Thu, Feb 3, 2022, 10:07 PM Hongyi Wang <whyni...@gmail.com> wrote: > Hello everyone, > > Just want to clarify a few points regarding "specify tasks to skip when > triggering DAG". > > First of all, the use case is not necessarily only backfill. We may want > to have this for manual triggers so users can do it easily in UI, rather > than using backfill CLI. This is similar to how Azkaban users skip a job > during execution ( > https://azkaban.github.io/azkaban/docs/latest/#executing-flows). > > Second, we DO want to retain the dependencies and that is a “hard” > requirement (as is shown in the task-a ☐ -> task-b ☑ -> task-c ☐ example). > > Last but not least, the change does NOT necessarily have to be in > Scheduler, there is a chance it can be done in Executor instead (if that's > preferred). > > @Ash any objection to this proposal? If so, may you please share your > concern? > > Look forward to reaching a consensus soon. > > Howie > > On Mon, Jan 31, 2022 at 5:44 PM Hongyi Wang <whyni...@gmail.com> wrote: > >> Hi Ash, thanks for the feedback. >> >> > You want it to behave as if the dag was specified as task-a -> task-c, >> right? >> >> Yes. As you mentioned, when we mark task-b as skipped, task-c's >> dependencies would be resolved and it would be eligible to run. BUT it does >> not necessarily mean task-a and task-c may run in parallel. The key is, if >> we only attempt to skip a task instance when it is eligible to run (e.g. >> task-b is eligible to run after task-a completes), we can still retain >> dependencies between the rest of tasks. >> >> > I'm not sold that this should belong in Airflow scheduler -- >> re-running DagRuns on an ad-hoc basis is more aligned with `airflow >> backfill`. >> >> Are you suggesting "skipping tasks when triggering DAG" should be an >> option specific to `airflow backfill`? (please correct me if I >> misunderstand) To me, the proposal is more aligned with `airflow trigger` >> than `airflow backfill`, but I hear you. The reason why I believe >> `scheduler_job.py` is the best place to skip tasks is because it allows >> Airflow to retain dependencies between non-skipped tasks (as I explained >> above). I understand changes to the Airflow scheduler may impact all users. >> I won't deny there is a risk. But considering the code change is >> comparatively small and straightforward, I am not too worried once it has >> been reviewed and tested. >> >> Hope I answered your question. I am happy to discuss more. >> >> Howie >> >> On Mon, Jan 31, 2022 at 4:03 AM Ash Berlin-Taylor <a...@apache.org> wrote: >> >>> > To illustrate the use case, I am going to use this example below. >>> > task-a ☐ -> task-b ☑ -> task-c ☐ >>> >>> So in this example, are you aware and intending that both task-a and >>> task-c run straight away? Because by skipping task-b, task-c's dependencies >>> would be resolved and it would be eligible to run. >>> >>> From what you've describe I don't think that is actually what you want, >>> but that you want it to behave as if the dag was specified as task-a -> >>> task-c, right? >>> >>> Honestly though: I'm not sold that this should belong in Airflow >>> scheduler -- re-running DagRuns on an ad-hoc basis is more aligned with >>> `airflow backfill`. >>> >>> -ash >>> >>> On Sun, Jan 30 2022 at 11:21:26 -0800, Hongyi Wang <whyni...@gmail.com> >>> wrote: >>> >>> Hi Elad, thank you for your feedback. To answer your question, besides >>> debugging, another common use case is re-running existing DAGs in an ad-hoc >>> manner. >>> >>> For example, as an Airflow user, I sometimes want to trigger an ad-hoc >>> DAG run. In this run, I want to skip one/more tasks, so the dag run can >>> yield a different result, or simply complete sooner. As I mentioned in my >>> previous email, there are other ways to achieve the same goal. But IMHO, >>> neither of them are easy & flexible enough for an ad-hoc use case. >>> >>> Does that sound like a reasonable use case? What do you think is the >>> best approach to solve it? I am happy to discuss more with you. >>> >>> On Sun, Jan 30, 2022 at 4:45 AM Elad Kalif <elad...@apache.org> wrote: >>> >>>> Can you describe a use case for the requested feature other than >>>> debugging? This doesn't feel like the right approach to test a specific >>>> task in a pipeline. >>>> >>>> On Fri, Jan 28, 2022 at 11:44 PM Alex Begg <alex.b...@gmail.com> wrote: >>>> >>>>> Actually, sorry, you can scratch out some of what I just said, I >>>>> thought you were talking about clearing states, you are instead referring >>>>> to triggering a DAG run. That does kind of make sense to have a way to >>>>> trigger a DAG run but only run specific tasks. >>>>> >>>>> On Fri, Jan 28, 2022 at 1:41 PM Alex Begg <alex.b...@gmail.com> wrote: >>>>> >>>>>> I believe this is currently possible by just unselecting “downstream” >>>>>> before you click “Clear” in the UI. It should only clear the one middle >>>>>> task and not the downstream task(s). >>>>>> >>>>>> I would prefer to not have a more detailed UI to allow to skip (or i >>>>>> want to say “bypass” as “skip” is itself a task state) specific >>>>>> downstream >>>>>> tasks as it might signal to users that it is ideal to specify tasks to >>>>>> bypass when in reality it is only something that should be done on >>>>>> occasion >>>>>> for experiment or troubleshooting as you mention, not a common >>>>>> occurrence. >>>>>> >>>>>> What I can agree to though is the list of buttons on the dialog >>>>>> window to change state of a task is a bit cluttered looking. There >>>>>> probably >>>>>> can be a better UI/UX for that, but I do think being able to >>>>>> check/uncheck >>>>>> downstream task is a way to go, that seems like it will be just as >>>>>> cluttered. >>>>>> >>>>>> Alex Begg >>>>>> >>>>>> On Fri, Jan 28, 2022 at 11:46 AM Hongyi Wang <whyni...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello everyone, >>>>>>> >>>>>>> I'd like to propose a new feature in Airflow -- allow users to specify >>>>>>> tasks to skip when trigger DAG run. >>>>>>> >>>>>>> From our own experience, this feature can be very useful when doing >>>>>>> experiments, troubleshooting or re-running existing DAGs. And I believe >>>>>>> it can benefit many Airflow users. >>>>>>> >>>>>>> To illustrate the use case, I am going to use this example below. >>>>>>> task-a ☐ -> task-b ☑ -> task-c ☐ >>>>>>> >>>>>>> Suppose we have a DAG containing 3 tasks. To troubleshoot "task-a" and >>>>>>> "task-c", I want to trigger a manual DAG run and skip "task-b" (so I >>>>>>> can save time & resource & focus on other two tasks). To do so, today I >>>>>>> have two options: >>>>>>> >>>>>>> Option 1: Trigger DAG, then manually mark "task-b" as `SUCCESS` >>>>>>> Option 2: Remove "task-b" from my DAG, then trigger DAG >>>>>>> >>>>>>> Neither of the options are great. Option 1 can be troublesome when DAG >>>>>>> is large, and there are multiple tasks I want to skip. Option 2 >>>>>>> requires change in the DAG file, which is not convenient for just >>>>>>> troubleshooting. >>>>>>> >>>>>>> Therefore, I would love to discuss how we can provide an easy way for >>>>>>> users to skip tasks when triggering DAG. >>>>>>> >>>>>>> Things to consider are: >>>>>>> 1) We should allow user to specify all tasks to skip at once when >>>>>>> trigger DAG >>>>>>> 2) We should retain the dependencies between non-skip tasks (in above >>>>>>> example, "task-c" won't start until "task-a" completes even if we >>>>>>> skipped "task-b") >>>>>>> 3) We should mark skipped task as `SKIPPED` instead of `SUCCESS` to >>>>>>> make it more intuitive >>>>>>> 4) The implementation should be easy, clean and low risk >>>>>>> >>>>>>> Here is my proposed solution (tested locally): >>>>>>> Today, Airflow allow user to pass a JSON to the Dagrun as >>>>>>> {{dag_run.conf}} when triggering DAG. The idea is, before queuing task >>>>>>> instances that satisfies dependences, `scheduler_job.py` (after we make >>>>>>> some change) will filter task instances to skip based on `dag_run.conf` >>>>>>> user passes in (e.g. {"skip_tasks": ["task-b"]}), then mark them as >>>>>>> SKIPPED. >>>>>>> >>>>>>> Things I would love to discuss: >>>>>>> - What do you think about this feature? >>>>>>> - What do you think about the proposed solution? >>>>>>> - Did I miss anything that you want to discuss? >>>>>>> - Is it necessary to introduce a new state (e.g. MANUAL_SKIPPED) to >>>>>>> differentiate SKIPPED? >>>>>>> >>>>>>> Howie >>>>>>> >>>>>>>