And if not with cluster policy, then you could pass such a "conditionally skip" callable to all tasks in a dag with default_args
On Thu, Feb 3, 2022, 10:16 PM Daniel Standish <daniel.stand...@astronomer.io> wrote: > It wouldn't necessarily need to involve the scheduler or the executor. > You could add logic to pre_execute to read dag run conf and skip self is > specified in the conf. In fact you could probably implement this currently > with this approach using cluster policy, since we can supply pre_execute > callables through param in base operator. > > On Thu, Feb 3, 2022, 10:07 PM Hongyi Wang <whyni...@gmail.com> wrote: > >> Hello everyone, >> >> Just want to clarify a few points regarding "specify tasks to skip when >> triggering DAG". >> >> First of all, the use case is not necessarily only backfill. We may want >> to have this for manual triggers so users can do it easily in UI, rather >> than using backfill CLI. This is similar to how Azkaban users skip a job >> during execution ( >> https://azkaban.github.io/azkaban/docs/latest/#executing-flows). >> >> Second, we DO want to retain the dependencies and that is a “hard” >> requirement (as is shown in the task-a ☐ -> task-b ☑ -> task-c ☐ example). >> >> Last but not least, the change does NOT necessarily have to be in >> Scheduler, there is a chance it can be done in Executor instead (if that's >> preferred). >> >> @Ash any objection to this proposal? If so, may you please share your >> concern? >> >> Look forward to reaching a consensus soon. >> >> Howie >> >> On Mon, Jan 31, 2022 at 5:44 PM Hongyi Wang <whyni...@gmail.com> wrote: >> >>> Hi Ash, thanks for the feedback. >>> >>> > You want it to behave as if the dag was specified as task-a -> task-c, >>> right? >>> >>> Yes. As you mentioned, when we mark task-b as skipped, task-c's >>> dependencies would be resolved and it would be eligible to run. BUT it does >>> not necessarily mean task-a and task-c may run in parallel. The key is, if >>> we only attempt to skip a task instance when it is eligible to run (e.g. >>> task-b is eligible to run after task-a completes), we can still retain >>> dependencies between the rest of tasks. >>> >>> > I'm not sold that this should belong in Airflow scheduler -- >>> re-running DagRuns on an ad-hoc basis is more aligned with `airflow >>> backfill`. >>> >>> Are you suggesting "skipping tasks when triggering DAG" should be an >>> option specific to `airflow backfill`? (please correct me if I >>> misunderstand) To me, the proposal is more aligned with `airflow trigger` >>> than `airflow backfill`, but I hear you. The reason why I believe >>> `scheduler_job.py` is the best place to skip tasks is because it allows >>> Airflow to retain dependencies between non-skipped tasks (as I explained >>> above). I understand changes to the Airflow scheduler may impact all users. >>> I won't deny there is a risk. But considering the code change is >>> comparatively small and straightforward, I am not too worried once it has >>> been reviewed and tested. >>> >>> Hope I answered your question. I am happy to discuss more. >>> >>> Howie >>> >>> On Mon, Jan 31, 2022 at 4:03 AM Ash Berlin-Taylor <a...@apache.org> >>> wrote: >>> >>>> > To illustrate the use case, I am going to use this example below. >>>> > task-a ☐ -> task-b ☑ -> task-c ☐ >>>> >>>> So in this example, are you aware and intending that both task-a and >>>> task-c run straight away? Because by skipping task-b, task-c's dependencies >>>> would be resolved and it would be eligible to run. >>>> >>>> From what you've describe I don't think that is actually what you want, >>>> but that you want it to behave as if the dag was specified as task-a -> >>>> task-c, right? >>>> >>>> Honestly though: I'm not sold that this should belong in Airflow >>>> scheduler -- re-running DagRuns on an ad-hoc basis is more aligned with >>>> `airflow backfill`. >>>> >>>> -ash >>>> >>>> On Sun, Jan 30 2022 at 11:21:26 -0800, Hongyi Wang <whyni...@gmail.com> >>>> wrote: >>>> >>>> Hi Elad, thank you for your feedback. To answer your question, besides >>>> debugging, another common use case is re-running existing DAGs in an ad-hoc >>>> manner. >>>> >>>> For example, as an Airflow user, I sometimes want to trigger an ad-hoc >>>> DAG run. In this run, I want to skip one/more tasks, so the dag run can >>>> yield a different result, or simply complete sooner. As I mentioned in my >>>> previous email, there are other ways to achieve the same goal. But IMHO, >>>> neither of them are easy & flexible enough for an ad-hoc use case. >>>> >>>> Does that sound like a reasonable use case? What do you think is the >>>> best approach to solve it? I am happy to discuss more with you. >>>> >>>> On Sun, Jan 30, 2022 at 4:45 AM Elad Kalif <elad...@apache.org> wrote: >>>> >>>>> Can you describe a use case for the requested feature other than >>>>> debugging? This doesn't feel like the right approach to test a specific >>>>> task in a pipeline. >>>>> >>>>> On Fri, Jan 28, 2022 at 11:44 PM Alex Begg <alex.b...@gmail.com> >>>>> wrote: >>>>> >>>>>> Actually, sorry, you can scratch out some of what I just said, I >>>>>> thought you were talking about clearing states, you are instead referring >>>>>> to triggering a DAG run. That does kind of make sense to have a way to >>>>>> trigger a DAG run but only run specific tasks. >>>>>> >>>>>> On Fri, Jan 28, 2022 at 1:41 PM Alex Begg <alex.b...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I believe this is currently possible by just unselecting >>>>>>> “downstream” before you click “Clear” in the UI. It should only clear >>>>>>> the >>>>>>> one middle task and not the downstream task(s). >>>>>>> >>>>>>> I would prefer to not have a more detailed UI to allow to skip (or i >>>>>>> want to say “bypass” as “skip” is itself a task state) specific >>>>>>> downstream >>>>>>> tasks as it might signal to users that it is ideal to specify tasks to >>>>>>> bypass when in reality it is only something that should be done on >>>>>>> occasion >>>>>>> for experiment or troubleshooting as you mention, not a common >>>>>>> occurrence. >>>>>>> >>>>>>> What I can agree to though is the list of buttons on the dialog >>>>>>> window to change state of a task is a bit cluttered looking. There >>>>>>> probably >>>>>>> can be a better UI/UX for that, but I do think being able to >>>>>>> check/uncheck >>>>>>> downstream task is a way to go, that seems like it will be just as >>>>>>> cluttered. >>>>>>> >>>>>>> Alex Begg >>>>>>> >>>>>>> On Fri, Jan 28, 2022 at 11:46 AM Hongyi Wang <whyni...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello everyone, >>>>>>>> >>>>>>>> I'd like to propose a new feature in Airflow -- allow users to specify >>>>>>>> tasks to skip when trigger DAG run. >>>>>>>> >>>>>>>> From our own experience, this feature can be very useful when doing >>>>>>>> experiments, troubleshooting or re-running existing DAGs. And I >>>>>>>> believe it can benefit many Airflow users. >>>>>>>> >>>>>>>> To illustrate the use case, I am going to use this example below. >>>>>>>> task-a ☐ -> task-b ☑ -> task-c ☐ >>>>>>>> >>>>>>>> Suppose we have a DAG containing 3 tasks. To troubleshoot "task-a" and >>>>>>>> "task-c", I want to trigger a manual DAG run and skip "task-b" (so I >>>>>>>> can save time & resource & focus on other two tasks). To do so, today >>>>>>>> I have two options: >>>>>>>> >>>>>>>> Option 1: Trigger DAG, then manually mark "task-b" as `SUCCESS` >>>>>>>> Option 2: Remove "task-b" from my DAG, then trigger DAG >>>>>>>> >>>>>>>> Neither of the options are great. Option 1 can be troublesome when DAG >>>>>>>> is large, and there are multiple tasks I want to skip. Option 2 >>>>>>>> requires change in the DAG file, which is not convenient for just >>>>>>>> troubleshooting. >>>>>>>> >>>>>>>> Therefore, I would love to discuss how we can provide an easy way for >>>>>>>> users to skip tasks when triggering DAG. >>>>>>>> >>>>>>>> Things to consider are: >>>>>>>> 1) We should allow user to specify all tasks to skip at once when >>>>>>>> trigger DAG >>>>>>>> 2) We should retain the dependencies between non-skip tasks (in above >>>>>>>> example, "task-c" won't start until "task-a" completes even if we >>>>>>>> skipped "task-b") >>>>>>>> 3) We should mark skipped task as `SKIPPED` instead of `SUCCESS` to >>>>>>>> make it more intuitive >>>>>>>> 4) The implementation should be easy, clean and low risk >>>>>>>> >>>>>>>> Here is my proposed solution (tested locally): >>>>>>>> Today, Airflow allow user to pass a JSON to the Dagrun as >>>>>>>> {{dag_run.conf}} when triggering DAG. The idea is, before queuing task >>>>>>>> instances that satisfies dependences, `scheduler_job.py` (after we >>>>>>>> make some change) will filter task instances to skip based on >>>>>>>> `dag_run.conf` user passes in (e.g. {"skip_tasks": ["task-b"]}), then >>>>>>>> mark them as SKIPPED. >>>>>>>> >>>>>>>> Things I would love to discuss: >>>>>>>> - What do you think about this feature? >>>>>>>> - What do you think about the proposed solution? >>>>>>>> - Did I miss anything that you want to discuss? >>>>>>>> - Is it necessary to introduce a new state (e.g. MANUAL_SKIPPED) to >>>>>>>> differentiate SKIPPED? >>>>>>>> >>>>>>>> Howie >>>>>>>> >>>>>>>>