We cleary want to position ourselves as the second group. Not that we don't 
write python code, as a fact we do, but once we see something could be solved 
in a generic way and be contributed back to Airflow, we will also propose that. 
 I understand the use of hooks principle, we also do it, when the operator just 
don't fit in the usage because of too technical difficulties, but as I already 
told, we want to keep custom code as minimal as possible.

The reasoning behind it is readability and maintenance, and not all the people 
who will write a DAG are very technical people.  The usage of operators is well 
documented in Airflow/Astronomer, so for those people the step to write a DAG 
isn't that hard.

My first example in the PR was using the "stream" functionality (maybe the 
method should be called "iterate" instead) on a PythonOperator, which was 
actually a bad example, as the looping could be easily done within the task 
decorated method itself.  Then I changed the "simplified" example with the 
MSGraphAsyncOperator.  And there, as also mentioned by Jarek, some operators do 
validation and post-processing outside of the hook which is also the case with 
the MSGraphAsyncOperator as many other operators do.  This is not because the 
operator is badly written, it's due to the nature of the DeferredTask and thus 
a combination between an operator and a triggerer which serves as one async 
operator, as this is how the deferred task mechanism is designed in Airflow.  
If you would have to do the same logic in a PythonOperator or a task decorated 
method, it would become very messy code (as the code is async), you would have 
to re-invent the whole post-processing e.g. the next_method part done by the 
operator as a result of the triggerer.  That's why I think this functionality 
could be handy, the complexity of handling async code and managing the 
parallelism and deferred code mechanism is handled for you, just like the 
dynamic task mapping does it, but then executed within the same worker 
concurrently (if wanted, could be sequentially as you can parametrize it).  You 
could also apply the same functionality on a task decorated method, and focus 
on the business code instead of solving technical issues.

We now at our company have lots of DAG's with custom code to solve those 
issues, and as I told in the beginning, we want to make our DAG's as clean and 
simple as possible, avoid code duplication and re-use the Airflow capabilities 
to the max without re-inventing the wheel.  That's why I thought of this 
solution, and I thought it could be a nice feature in the future for Airflow 
3.x.  We are already using this in our Airflow instances, the gain in 
performance is huge and the DAG's are much easier to understand with less 
technical code, as the problem has been solved in a generic way.  That's also 
why I've implemented it with the functionality available in Airflow 2.x as a 
POC, but as already mentioned, it would indeed be better to redo it the clean 
way in Airflow 3.x. That's why now that I know it's possible in Airflow 2.x, I 
wanted to come back to you guys with this proposition. I also think it's not a 
good idea to still implement it in Airflow 2.x as Airflow 3.0 is on it's way. I 
also see this as a new possible feature for Airflow 3.x, which would be an 
additional argument to do the migration to 3.x.  Anyway, as an Airflow user and 
contributor, I'm willing to help here.  For us it doesn't matter, as we can 
already use it, but it could nice that other Aiflow users could also benefit 
from this functionality, as I know this topic has been discussed many times.

-----Original Message-----
From: Jarek Potiuk <ja...@potiuk.com> 
Sent: Friday, October 4, 2024 5:52 AM
To: dev@airflow.apache.org
Subject: Re: [PROPOSAL] Add streaming support to PartialOperator

EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet 
vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur deze 
e-mail als bijlage naar ab...@infrabel.be<mailto:ab...@infrabel.be>.

> why not just do things sequentially in a loop inside of a task?

Yes I think you nailed it - and I think it's just the abstraction you use in 
this case.

When you loop in the task to do a small thing many times with one of the 
integrations of Airflow - you could use Hook for that. But - apparently - this 
abstraction is difficult to discover and possibly sometimes difficult to 
discover for the user because all they know is "operators" and they do not know 
what Hooks do.

So .. the natural way to interact with external integration for many of our 
users is via Operators - so to allow such looping using operators sounds like 
"follow what is natural for your users".

Basically.- we are not telling the users "Use hooks", but we are following what 
our users want to do -  "use operators" in this case as it feels more natural 
for them. I think - now when i think of that - it simply shows that we have two 
kinds of users in this case:

1) those who know and are happy to write custom operators (they will use
hooks)
2) those who are more comfortable in just putting together existing building 
blocks - i.e. operators (all they know are dag, operators, dependencies - and 
when they come to composing things they think of task flow as the way to 
compose the things they know

Clearly -> allowing to use operators in task flow in this mode would respond to 
the 2nd group of the users.

For me this is kinda model leadership we should do - when you as a leader in a 
space try to convince others to do things in one way, but pretty much everyone 
is not following and stubbornly attempt to use the thing you think is wrong, 
maybe it's a good time to think "well maybe they are right".

J.


On Thu, Oct 3, 2024 at 8:10 PM Daniel Standish 
<daniel.stand...@astronomer.io.invalid> wrote:

> The thing i'm having trouble with is that the problem the user, David, 
> is trying to solve is basically, that airflow doesn't like super fine-grained
> tasks.   Like let's push this to the limit.  I run an ecommerce company
> that has 10M visitors per day and each time they visit we update the 
> visitor table.  I want to run a daily job to process updates.  Should 
> I model my pipeline as 1 task per customer?  Probably not a good idea.
>
> There's a reason e.g. that databases exist and you can do things in a 
> set-based way.  There seems to be an analogy here with David's example.
> That's why I asked why model it so fine grained.  He does not seem to 
> want to write a custom operator, but it would seem it's probably a 
> good idea here.  One way of thinking about the use case is, I want to 
> do things sequentially in a loop -- why not just do things 
> sequentially in a loop inside of a task?
>

Reply via email to