Hi Daniel and Jarek, After discussing with XD and the rest of our team members, we decided to move forward with the `pre_execute` solution. Thank you @Deniel for raising the idea. Thank you @Jarek for explaining the tradeoffs in detail.
Besides "how to skip tasks", another thing I'd love to discuss is the "skip state to set". | So the skipped task needs to have a State identical to "Success", other than the name. Hence a new state may be needed When skipping a task, (in `pre_execute`) we can either choose to mark it as "SUCCESS" or "SKIPPED". To differentiate skipped tasks from successful ones, "SKIPPED" is preferred, however, it may not work well with some trigger_rules like `all_success`. Therefore, I am thinking to introduce a new state e.g. "SKIPPED_AS_SUCCESS", which is different from "SUCCESS", but will be treated the same as "SUCCESS" when evaluating dependency rules. WYT? Howie On Sat, Feb 5, 2022 at 8:10 AM Jarek Potiuk <ja...@potiuk.com> wrote: > I thought a bit about it and there is one more thing that I do not really > like about the approach when you specify the task to skip in the dag run > conf. > > Currently the "logic" of the dag run conf is exclusively "dag specific" > and only used at "task execution" time (mind - it is not used in "dag > scheduling time" at all. Currently "Airflow Core" does not have to know > anything about it and can transparently pass it to the task without parsing > it. And it's the logic inside the DAG that should "understand" it. There is > not a single other use case where dag_run conf is used and parsed by > scheduler. > Trying to make "dag_run.conf" to be used either in "Airflow Core" or in > "DAG logic" depending on the parameter, is IMHO very wrong because it is > mixing independent concerns. > > Currently things are simple: > > * dag structure is defined by DAG parsing (this is the only thing > Scheduler cares) > * task execution uses the "static" structure defined by DAG parsing and > uses dag_run conf to implement logic to react to different parameters > > When you look at this from that perspective - the proposal is a weird > mixture of both. And if we do it, in the future it **might** prevent us > from doing a number of optimizations. For example dag_run_conf does not > have to be parsed at all when the scheduler does its job. The only place > dag_run conf is parsed is only at the moment the task is executed. > > For me this is a huge '-1' of the solution. > > Coming back XD to your example - I think what you are describing there is > a particular DAG requirement, not something that Airflow core should take > care about. Also - no matter how the task will be skipped (whether in > pre-execute or by scheduler, if you want to "Skip" B2 but behave as if it > "succeeded" you have exactly the same problems with triggering rules. It > completely does not matter if you "skip" by throwing an exception or "Skip" > by airflow. What you are really talking about it is not to "skip" certain > tasks but rally about "pretending they succeeded". This is an entirely > different thing than that we started the discussion with :). We were > talking about "skipping" the task - which in Airflow is a different state > than "Succeeded". > > And if you really want to "pretend success" you don't even have to (or > even should) use 'pre_execute' for it. There are better ways already. > > If you are using the "modern" way of writing tasks (which I assume is the > case for anyone who does custom work like that) I imagine your B2 task > should be written this way (writing from memory so it might not compile): > > @task > def task_b2(context: 'Context'): > if context.dag_run.conf.get("should_task2_pretend_it_succeeded"): > return > # do stuff (remember to use hooks instead of the operators in the @task > decorated tasks to use multiple Airflow providers) > > IMHO - this is sooooooo much more "Modern Airflow way" of approaching the > problem: > > * it's very simple and follows the "modern" way of writing Airflow tasks > (see also my talk about it from this week NYC Airflow Meetup > https://photos.app.goo.gl/ycCLpCmQ9fiTDNNDA) > * the logic of "pretending" is exactly there next to the code that > otherwise should be run and it's very obvious what happens (otherwise you > have to get a mental jump from DAG parameters passed through scheduler and > task execution) > * it's extremely explicit (https://www.python.org/dev/peps/pep-0020/) - > "explicit is better than implicit" > * dag_run configuration does not have to be understood by scheduler > * your DAG structure does not have to be "complicated" as the task will > end up in "success" state > * this is imperative, not declarative, and DAG writer decides how > "limited" the conditions are - Airflow does not limit you in any way and > the condition can be arbitrary complex or simple (and take into account > other parameters, dag_run, logical dates, actual dates of execution and > basically anything else. > > From all the comments I saw - I am pretty convinced this is a much better > way to approach it. > > BTW. There is indeed one "potential" pro of the "scheduler-based" skip > calculation. It can provide a little optimization - the task does not need > to be run at all to make the decision in this case (similarly what we do > with DummyTasks now). But IMHO, this is totally offset by the fact that the > scheduler would have to parse and analyse the dag_run conf to make the > decisions - overall, it could be even slower in a number of cases. The > scheduler loop is pretty critical and any extra logic and parsing there > might have huge, unforeseen initial impact and we should only add any logic > there if we are absolutely sure all the potential performance impact is > well analyzed and understood. > > > Giorgio Zoppi, > > XCom being "small" is also a thing of the past (if you use custom XCom > backends) - see the same talk of mine > https://photos.app.goo.gl/ycCLpCmQ9fiTDNNDA ) but also > https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html?highlight=custom%20xcom#custom-backends > and this nice article from Astronomer > https://www.astronomer.io/guides/custom-xcom-backends > > J. > > > > On Sat, Feb 5, 2022 at 4:22 PM Giorgio Zoppi <giorgio.zo...@gmail.com> > wrote: > >> Hey Jurek, >> Just a question about the future development, is the XComm backend >> replaceable now? The real power of Airflow is the 'defacto' the glue >> between different ways of mangling data, such as Python is the glue when >> you need to implement things a lower level, ie. Altair Simulation software >> is written for critical parts in C++ and binded at upper level in Python. >> Same we can state for Tensorflow, numpy. About my question: it would be >> nice to have as XComm backend a queue like redis or a non structured >> database for allowing scaling but i am not sure >> it it makes sense since XComm is just for short messages. >> Best Regards, >> Giorgio >> >> >> >>