This is why I asked, "how is this different from just putting
asyncio.run()" in the operator / task.

Just wondering if it's really worth supporting async func when that's all
it does.

On Mon, Dec 15, 2025 at 3:55 PM Jarek Potiuk <[email protected]> wrote:

> This is a good point Philippe - that one comment clicked why I had a bit of
> struggle with the proposal and maybe this is something that we all think
> this proposal is about - but it's not clear without examples.
>
> So let me rephrase how I understand it and see if we can agree on it - with
> examples.
>
> For me `@task` is always synonymous with single "process" execution - even
> when mapped - the single execution of a method in @task clearly starts when
> the process starts and when it ends - the process exits.
> There is - I think - no particular reason why we should change that. One
> `@task` = one process.
>
> Now - currently, the task is just a simple synchronous callable run by
> PythonOperator in the main thread, so yes, indeed you cannot easily execute
> async coroutines in it without adding other async methods that you call
> explicitly with event loop, because you need to run it within an event
> loop. So you can do this now:
>
> class TheHookToRun:
>        async def method_to_run():
>
> async def async_part_of_the_task(conn_id, array_of_values):
>     parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value in
> array_of_values]
>     results = await asyncio.gather(*parts)
>
> @task()
> def the_task(array_of_values):
>     conn_id = "a_connection"
>         asyncio.run(async_part_of_the_task(conn_id, array_of_values)
>
>
> And I think what we could easily do and **all** that we need is to change
> the @task decorated methods to be able to run as coroutine, so conceptually
> we want to do this, I think:
>
> class TheHookToRun:
>        async def method_to_run():
>
> @task()
> async def the_task(array_of_values):
>     conn_id = "a_connection"
>     parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value in
> array_of_values]
>     results = await asyncio.gather(*parts)
>
> So - what we want to do is to mark `the_task` as async - using standard
> python semantics and make sure that when callable is passed to
> PythonOperator is async, we run:
>
> asyncio.run(callable())
>
> instead of
>
> callable()
>
> We only need to check if the callable is sync or async (something that for
> example fast_api does and we had recent problems with in a recent version
> when they got it wrong).
>
> I think this is basically all we need to achieve what you wanted David.
> Maybe (I am not sure) you also wanted to do looping  somehow outside of the
> `@task` callable, do something more sophisticated here, but IMHO just
> explicitly allowing async task-decorated methods and making sure they run
> in an event loop addresses exactly what you wanted to achieve.
>
> But maybe you thought about something else David - and can provide an
> example how it would look like in Dag ?
>
> J.
>
>
>
> On Mon, Dec 15, 2025 at 6:09 PM Philippe Gagnon via dev <
> [email protected]> wrote:
>
> > Hi,
> >
> > > There async tasks will benefit from the multi threading, as they share
> > the same event loop and everything is run within the same Celery worker,
> > but that's another solution.
> >
> > I'm not sure that's accurate; every task runs within its own process.
> That
> > being said, I think the ability to run async callables via PythonOperator
> > (or any other operator for that matter) is valuable in and of itself,
> and I
> > don't think there would be much pushback if we adopted interfaces to
> > facilitate this. WDYT about reducing the scope of your proposal (and PR)
> to
> > this framing, that way it would be easier to gain consensus?
> >
> > (I think that's essentially what you are pitching 🙂)
> >
> > BR,
> >
> > *✨ **Philippe Gagnon*
> > *Meet for 30 mins 📅* <https://calendar.app.google/5qzgD9SadybUvSCv8>
> >
> >
> > On Fri, Dec 5, 2025 at 2:49 AM Blain David <[email protected]>
> > wrote:
> >
> > > Hello Jense,
> > >
> > > Thanks for your time and answer.  I just granted you and Zhe-You access
> > to
> > > the document.
> > >
> > > In the article I explained why we did the iteration ourselves within
> the
> > > async @task decorated function, as this was way faster than doing it
> with
> > > dynamic task mapping.
> > > Not that you cannot use dynamic task mapping with async PythonOperator,
> > it
> > > just works as with any other operator, it's just doesn't make sense as
> it
> > > won't give you any performance benefits due to the fact that you don't
> > > share the same event loop (at least when using the CeleryExecutor).
> > >
> > > You could for example on big streams of data use dynamic tsk mapping to
> > > chunk the stream in multiple pieces and then each task would process
> the
> > > chunk within the async operator for example, a bit like partition if
> you
> > > like.
> > >
> > > In the example I used for the article, we once again don't care about
> > > individual download state of the FTP-file, we just want to know if the
> > > directory was successfully downloaded or not, ofc we added some logging
> > > statements to show which file was downloaded.
> > > I also know Jarek wants individual state tracking, but that' not the
> > > solution I presented here, for micro batching we have our
> > IterableOperator,
> > > which instead of doing partial/expand we do partial/iterate, which
> > actually
> > > does the same as the for loop of the example in the article but then
> > > managed for
> > > you in a multi threaded way for sync as async tasks as well.  There
> async
> > > tasks will benefit from the multi threading, as they share the same
> event
> > > loop and everything is run within the same Celery worker, but that's
> > > another solution.
> > >
> > > Still with the dynamic task mapping or IterableOperator, you wouldn't
> be
> > > able to use the SFTPClientPool (before name AsyncSFTPConnectionPool as
> in
> > > the article), so you wouldn't benefit of the performance gain you get
> > from
> > > the pool, that why here in this example,
> > > we do the looping ourselves.
> > >
> > > And I completely agree for triggerers, we also use it a lot, and it is
> > > indeed cool for long running tasks in which have lot's of waiting times
> > > (dead time), and you're just monitoring a state, that the purpose of
> > > triggerers!
> > > But with some operators, triggers are misused as they are the "only"
> way
> > > to run async code which returns a lot of data which have to come back
> to
> > > the operator so it can be exposed as an XCom, there you'll see that you
> > > trigger table in the Airflow database will explode fast,
> > > As each yielded response is being stored in the database, as it can't
> > make
> > > use of a custom XCom backend like operators do, so in practise, each
> > result
> > > yielded by the trigger, will first end up in your tirgger table before
> > > being stored as an XCom.
> > >
> > > Also, I'm not telling here to replace anything, I just propose an
> > > alternative solution in Airflow so you're not purely tied to deferrable
> > > operators.  Also at the Summit, we experienced during a lot of
> > > presentations, that people tend to use more the PythonOperators (or
> > @task)
> > > with hook than
> > > using the operators themselves, which in my opinion makes sense as you
> > > have more flexibility, you still benefit from the Airflow integration
> > with
> > > Hooks but you aren't tight to the operator implementation, which might
> > > offer limited operations, look for example at the SFTPOperator and
> you'll
> > > understand immediately.  That's why I propose to allow running async
> code
> > > in PythonOperators natively, that way, you can directly interact with
> > async
> > > hooks and you don't need a triggerer to do that.  Triggers are great
> for
> > > polling and listening, but not for processing huge amounts of data,
> > > that's where celery workers shine, thus allowing PythonOperators to
> > > natively run async code in you celery workers, you can do so.
> > >
> > > For you last example, in our case DB calls are still sync, as of my
> > > knowledge, we don’t have any DB hook based on DBApiHook which supports
> > > async operations?  Also the db operation can be seen as another sync
> > task,
> > > so they don't need to run in the same async task, you just pass the
> XCom
> > > returned from the async task to the sync task.  But having async DB
> hooks
> > > could be cool, I also though about it, but this would also depend on
> the
> > > driver if it supports it, still it also something I would like to test
> in
> > > the near future.
> > >
> > > I hope this answered most of your questions Jens 😉
> > >
> > > -----Original Message-----
> > > From: Jens Scheffler <[email protected]>
> > > Sent: 04 December 2025 21:12
> > > To: [email protected]
> > > Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks
> and
> > > performance in Airflow 3 by supporting native async PythonOperator
> > >
> > > EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze
> > > niet vertrouwt, klik niet op een link of open geen bijlages. Bij
> twijfel,
> > > stuur deze e-mail als bijlage naar [email protected]<mailto:
> > > [email protected]>.
> > >
> > > Requested access to Google doc to read more details. Am interested and
> > > also as Daniel what the difference is/would be.
> > >
> > > Especially also as progress tracking might be important. Yes, Task
> > Mapping
> > > is very expensive if you want to download 17k XML files, but also when
> > > running Async and you are at 5000 files, if you resume would you know
> > what
> > > was complete or would it start from scratch all times?
> > >
> > > I think such micro-batching is cool but some state tracking is
> important
> > > - which might if it is in the DB also overload the DB or add very many
> > > transactions.
> > >
> > > Trioggerer though I think still is cool for long running tasks where
> you
> > > just wait for response, e.g. you triggered another job remote or you
> > > started a Pod that runs for an hour. We have Pods runnign for 10h
> > sometimes
> > > and then it is important to be able to roll new SW to workers and with
> > > triggerers we cann de-couple this.
> > >
> > > So maybe - without missing details - I would judge such micro-batching
> as
> > > a third execution option but most probably would not replace the
> others.
> > >
> > > Also knowin from own experience, writing async code is more complex and
> > > error prone, so if you would request all normal code being async you
> > might
> > > scare users away. Proper review needed to ensure all IO is async (also
> DB
> > > calls!)
> > >
> > > On 12/4/25 18:08, Daniel Standish via dev wrote:
> > > > Here's what I'm hearing from this
> > > >
> > > > 1. not using task mapping, but just looping instead, can be much more
> > > > efficient.
> > > > Yes, of course it can.
> > > >
> > > > 2. there are ways in which triggerer / deferrable operators are not
> > > > fully complete, or do not fully have feature parity with regular
> > > > operators (such as the custom xcom backend example) I believe it.
> But
> > > > this could certainly be worked on.
> > > >
> > > > Question for you:
> > > >
> > > > How is your proposal different / better than say, just calling
> > > > `asyncio.run(...)` in a python task?
> > > >
> > > >
> > > > On Thu, Dec 4, 2025 at 8:38 AM Blain David <[email protected]>
> > > wrote:
> > > >
> > > >> As I already discussed with Jarek in the past but also with Hussein
> > > >> during the Airflow Summit, we at a certain moment encountered
> > > >> performance issues when using a lot of deferred operators.
> > > >>
> > > >> Allowing PythonOperators (and thus also @task decorated methods) to
> > > >> natively execute async Python code in Airflow solved our performance
> > > issues.
> > > >> And yes, you could argue if that’s really necessary and also what’s
> > > >> the added value? And at first you would indeed think it doesn’t make
> > > >> sense at all do so, right?
> > > >> But please bear with me first and hear me out first why we did it
> > > >> that way and how it solved our performance issues and it will become
> > > >> crystal clear 😉
> > > >> So below is the article I wrote, which is also publicly available
> > > >> here<
> > > >> https://doc/
> > > >> s.google.com
> %2Fdocument%2Fd%2F1pNdQUB0gH-r2X1N_g774IOUEurowwQZ5OJ7yiY
> > > >> 89qok&data=05%7C02%7Cdavid.blain%40infrabel.be
> %7Ca90fcc33ab604f91948f
> > > >>
> 08de33719fc0%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C63900476028
> > > >>
> 6869534%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuM
> > > >>
> DAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&
> > > >> sdata=%2FNbnXz%2BWTH0WDp8lTic8sraokWDojaNYfr51I2ohy58%3D&reserved=0>
> > > >> on Google Docs  which makes it easier to read than through the
> > devlist.
> > > >>
> > > >> Here is my article:
> > > >>
> > > >> Rethinking deferrable operators, async hooks and performance in
> > > >> Airflow 3
> > > >>
> > > >> At our company, we strive to avoid custom code in Airflow as much as
> > > >> possible to improve maintainability.
> > > >> For years this meant favouring dedicated Airflow operators over
> > > >> Python operators.
> > > >> However, in Airflow 3, as the number of deferred operators in our
> > > >> DAGs continued to grow, we began facing severe performance issues
> > > >> with deferrable operators, which forced us to re-evaluate that
> > approach.
> > > >>
> > > >> Initially we expected deferrable operators to improve performance
> for
> > > >> I/O-related tasks—such as REST API calls—because triggerers follow
> an
> > > >> async producer/consumer pattern. But in practice we discovered the
> > > opposite.
> > > >>
> > > >> Why Deferrable Operators Became the Bottleneck?
> > > >>
> > > >> Deferrable operators and sensors delegate async work to triggerers.
> > > >> This is perfectly fine for lightweight tasks such as polling or
> > > >> waiting for messages on a queue.
> > > >>
> > > >> But in our case:
> > > >>
> > > >>
> > > >>    *   MSGraphAsyncOperator performs long-running async operations.
> > > >>    *   HttpOperator in deferrable mode can perform long-running HTTP
> > > >> interactions, especially if pagination is involved.
> > > >>    *   There is no native deferrable SFTPOperator, so if we want to
> > use
> > > the
> > > >> SFTPHookAsync, we must use the PythonOperator which natively doesn’t
> > > >> support async code (not that big of challenge).
> > > >>    *   Both can return large payloads.
> > > >>    *   Triggerers must store yielded events directly into the
> Airflow
> > > >> metadata database.
> > > >>
> > > >> Triggerers are not designed for sustained high-load async execution
> > > >> or large data transfers. Unlike Celery workers, triggerers scale
> > > >> poorly and quickly become the bottleneck.
> > > >>
> > > >> Yielded events from triggers are stored directly in the Airflow
> > > >> metadata database because, unlike workers, triggers cannot leverage
> a
> > > >> custom XCom backend to offload large payloads, which can lead to
> > > >> increased database load and potential performance bottlenecks.
> > > >>
> > > >> Dynamic task mapping with deferrable operators amplifies the problem
> > > >> even further which AIP‑88 partially solves.
> > > >> Triggerers also cannot be run on the Edge Executor as triggerers are
> > > >> still tightly coupled with the Airflow metadata database (possibly
> > > >> addressed in AIP‑92).
> > > >>
> > > >> Rethinking the approach: Async hooks + Python tasks
> > > >>
> > > >> These limitations led us to reconsider calling async hooks directly
> > > >> from Python @task decorated functions or PythonOperators, thus
> > > >> avoiding deferrable operators and thus triggerers entirely.
> > > >> Operators are wrappers around hooks. Well‑written operators should
> > > >> contain little logic and delegate all the work to the hooks which do
> > > >> the real work,so  why not call them directly?
> > > >> This idea is also a bit in line with what Bolke already presented<
> > > >> https://airflowsummit.org/slides/2023/ab1-1400-Operators.pdf> in
> > 2023.
> > > >>
> > > >> Advantages of this approach include:
> > > >>
> > > >>
> > > >>    *   No dynamic task mapping needed when iterating—just loop in
> > > Python,
> > > >> unless you really need to track each individual step but that comes
> > > >> with a cost.
> > > >>    *   Massive reduction in scheduler load.
> > > >>    *   No triggerers involved.
> > > >>    *   Async code can run on Edge Workers.
> > > >>    *   Celery workers scale far much better than triggerers, so by
> > > moving
> > > >> from deferred operators and thus triggerers to async operators on
> > > >> celery workers, our performance issues on the triggerer were gone
> and
> > > >> run times were much shorter probably because the trigger mechanism
> > > >> also puts more load on the scheduler.
> > > >>    *   Sync or async doesn’t make any difference in performance,
> > unless
> > > you
> > > >> have to execute the same async function multiple times, that’s when
> > > >> async shines compared to sync especially with I/O related
> operations.
> > > >>
> > > >> Concrete Example: Async SFTP Downloads
> > > >>
> > > >> Below is an example comparing the download of ~17,000 XML-files and
> > > >> storing into our Datawarehouse.
> > > >> A single Celery worker can orchestrate many concurrent downloads
> > > >> using asyncio.
> > > >> A semaphore (here used internally by the AsyncSFTPConnectionPool)
> > > >> protects the SFTP server from being overloaded.
> > > >> Benchmark results:
> > > >>
> > > >> Approach
> > > >>                              Environment                    Time
> > > >> Mapped SFTPOperator
> > > >>             production                       3h 25m 55s
> > > >> PythonOperator + SFTPHook
> > > >>       local laptop                     1h 21m 09s
> > > >> Async Python task + SFTPHookAsync (without pool)       local laptop
> > > >>               8m 29s
> > > >> Async Python task + AsyncSFTPConnectionPool              production
> > > >>                 3m32s
> > > >>
> > > >> As you all can conclude, DagRun time went down from more than 3
> hours
> > > >> to only 3 minutes and a half, which is huge!
> > > >>
> > > >> In the google docs there are 2 different code snippets on how it’s
> > > >> done sync and async which I will not put here.
> > > >>
> > > >> Conclusion
> > > >>
> > > >> Using async hooks inside async Python tasks provides better
> > > >> performance, scalability, and flexibility, and avoids reliance on
> > > triggerers entirely.
> > > >> This hybrid approach—'async where it matters, operators where they
> > > >> make sense'—may represent the future of high‑performance Airflow
> data
> > > >> processing workloads.
> > > >>
> > > >> What did I change in Airflow?
> > > >>
> > > >> Not that much, I only:
> > > >>
> > > >>
> > > >>    *   Introduced an async PythonOperator so you don’t have to
> handle
> > > the
> > > >> event loop yourself, not that special, but also natively supported
> on
> > > >> async @task decorated python methods, which is nice to read.
> > > >>    *   Did some improvements on the SFTPHookAsync to fully take
> > > advantage
> > > >> of the async.
> > > >>    *   Introduced a SFTPHookPool so multiple asyncio tasks can
> re-use
> > > >> connection instance to gain even more performance, in this case it
> > > >> meant a reduction of 5 minutes in processing time, so we went from 8
> > to
> > > 3 minutes.
> > > >>
> > > >>
> > > >>
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: [email protected]
> > > For additional commands, e-mail: [email protected]
> > >
> > >
> >
>

Reply via email to