Hi Daniel and Jarek,

After discussing with XD and the rest of our team members, we decided to
move forward with the `pre_execute` solution. Thank you @Deniel for raising
the idea. Thank you  @Jarek for explaining the tradeoffs in detail.

Besides "how to skip tasks",  another thing I'd love to discuss is the
"skip state to set".

| So the skipped task needs to have a State identical to "Success", other
than the name. Hence a new state may be needed

When skipping a task, (in `pre_execute`) we can either choose to mark it as
"SUCCESS" or "SKIPPED". To differentiate skipped tasks from successful
ones, "SKIPPED" is preferred, however, it may not work well with some
trigger_rules like `all_success`. Therefore, I am thinking to introduce a
new state e.g. "SKIPPED_AS_SUCCESS", which is different from "SUCCESS", but
will be treated the same as "SUCCESS" when evaluating dependency rules.

WYT?

Howie

On Sat, Feb 5, 2022 at 8:10 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> I thought a bit about it and there is one more thing that I do not really
> like about the approach when you specify the task to skip in the dag run
> conf.
>
> Currently the "logic" of the dag run conf is exclusively "dag specific"
> and only used at "task execution" time (mind - it is not used in "dag
> scheduling time" at all. Currently "Airflow Core" does not have to know
> anything about it and can transparently pass it to the task without parsing
> it. And it's the logic inside the DAG that should "understand" it. There is
> not a single other use case where dag_run conf is used and parsed by
> scheduler.
> Trying to make "dag_run.conf" to be used either in "Airflow Core" or in
> "DAG logic" depending on the parameter, is IMHO very wrong because it is
> mixing independent concerns.
>
> Currently things are simple:
>
> * dag structure is defined by DAG parsing (this is the only thing
> Scheduler cares)
> * task execution uses the "static" structure defined by DAG parsing and
> uses dag_run conf to implement logic to react to different parameters
>
> When you look at this from that perspective - the proposal is a weird
> mixture of both. And if we do it, in the future it **might** prevent us
> from doing a number of optimizations. For example dag_run_conf does not
> have to be parsed at all when the scheduler does its job. The only place
> dag_run conf is parsed is only at the moment the task is executed.
>
> For me this is a huge '-1' of the solution.
>
> Coming back XD to your example - I think what you are describing there is
> a particular DAG requirement, not something that Airflow core should take
> care about. Also - no matter how the task will be skipped (whether in
> pre-execute or by scheduler, if you want to "Skip" B2 but behave as if it
> "succeeded" you have exactly the same problems with triggering rules. It
> completely does not matter if you "skip" by throwing an exception or "Skip"
> by airflow. What you are really talking about it is not to "skip" certain
> tasks but rally about "pretending they succeeded". This is an entirely
> different thing than that we started the discussion with :). We were
> talking about "skipping" the task - which in Airflow is a different state
> than "Succeeded".
>
> And if you really want to "pretend success" you don't even have to (or
> even should) use 'pre_execute' for it. There are better ways already.
>
> If you are using the "modern" way of writing tasks (which I assume is the
> case for anyone who does custom work like that) I imagine your B2 task
> should be written this way (writing from memory so it might not compile):
>
> @task
> def task_b2(context: 'Context'):
>    if context.dag_run.conf.get("should_task2_pretend_it_succeeded"):
>       return
>    # do stuff (remember to use hooks instead of the operators in the @task
> decorated tasks to use multiple Airflow providers)
>
> IMHO - this is sooooooo much more  "Modern Airflow way" of approaching the
> problem:
>
>  * it's very simple and follows the "modern" way of writing Airflow tasks
> (see also my talk about it from this week NYC Airflow Meetup
> https://photos.app.goo.gl/ycCLpCmQ9fiTDNNDA)
>  * the logic of "pretending" is exactly there next to the code that
> otherwise should be run and it's very obvious what happens (otherwise you
> have to get a mental jump from DAG parameters passed through scheduler and
> task execution)
>  * it's extremely explicit (https://www.python.org/dev/peps/pep-0020/) -
> "explicit is better than implicit"
>  * dag_run configuration does not have to be understood by scheduler
>  * your DAG structure does not have to be "complicated" as the task will
> end up in "success" state
>  * this is imperative, not declarative, and DAG writer decides how
> "limited" the conditions are - Airflow does not limit you in any way and
> the condition can be arbitrary complex or simple (and take into account
> other parameters, dag_run, logical dates, actual dates of execution and
> basically anything else.
>
> From all the comments I saw - I am pretty convinced this is a much better
> way to approach it.
>
> BTW. There is indeed one "potential" pro of the "scheduler-based" skip
> calculation. It can provide a little optimization - the task does not need
> to be run at all to make the decision in this case (similarly what we do
> with DummyTasks now). But IMHO, this is totally offset by the fact that the
> scheduler would have to parse and analyse the dag_run conf to make the
> decisions - overall, it could be even slower in a number of cases. The
> scheduler loop is pretty critical and any extra logic and parsing there
> might have huge, unforeseen initial impact and we should only add any logic
> there if we are absolutely sure all the potential performance impact is
> well analyzed and understood.
>
>
> Giorgio Zoppi,
>
> XCom being "small" is also a thing of the past (if you use custom XCom
> backends)  - see the same talk of mine
> https://photos.app.goo.gl/ycCLpCmQ9fiTDNNDA ) but also
> https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html?highlight=custom%20xcom#custom-backends
> and this nice article from Astronomer
> https://www.astronomer.io/guides/custom-xcom-backends
>
> J.
>
>
>
> On Sat, Feb 5, 2022 at 4:22 PM Giorgio Zoppi <giorgio.zo...@gmail.com>
> wrote:
>
>> Hey Jurek,
>> Just a question about  the future development, is the XComm backend
>> replaceable now? The real power of Airflow is the 'defacto' the glue
>> between different ways of mangling data, such as Python is the glue when
>> you need to implement things a lower level, ie. Altair Simulation software
>> is written for critical parts in C++ and binded at upper level in Python.
>> Same we can state for Tensorflow, numpy. About my question: it would be
>> nice to have as XComm backend a queue like redis or a non structured
>> database for allowing scaling but i am not sure
>> it it makes sense since XComm is just for short messages.
>> Best Regards,
>> Giorgio
>>
>>
>>
>>

Reply via email to