This is why I asked, "how is this different from just putting asyncio.run()" in the operator / task.
Just wondering if it's really worth supporting async func when that's all it does. On Mon, Dec 15, 2025 at 3:55 PM Jarek Potiuk <[email protected]> wrote: > This is a good point Philippe - that one comment clicked why I had a bit of > struggle with the proposal and maybe this is something that we all think > this proposal is about - but it's not clear without examples. > > So let me rephrase how I understand it and see if we can agree on it - with > examples. > > For me `@task` is always synonymous with single "process" execution - even > when mapped - the single execution of a method in @task clearly starts when > the process starts and when it ends - the process exits. > There is - I think - no particular reason why we should change that. One > `@task` = one process. > > Now - currently, the task is just a simple synchronous callable run by > PythonOperator in the main thread, so yes, indeed you cannot easily execute > async coroutines in it without adding other async methods that you call > explicitly with event loop, because you need to run it within an event > loop. So you can do this now: > > class TheHookToRun: > async def method_to_run(): > > async def async_part_of_the_task(conn_id, array_of_values): > parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value in > array_of_values] > results = await asyncio.gather(*parts) > > @task() > def the_task(array_of_values): > conn_id = "a_connection" > asyncio.run(async_part_of_the_task(conn_id, array_of_values) > > > And I think what we could easily do and **all** that we need is to change > the @task decorated methods to be able to run as coroutine, so conceptually > we want to do this, I think: > > class TheHookToRun: > async def method_to_run(): > > @task() > async def the_task(array_of_values): > conn_id = "a_connection" > parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value in > array_of_values] > results = await asyncio.gather(*parts) > > So - what we want to do is to mark `the_task` as async - using standard > python semantics and make sure that when callable is passed to > PythonOperator is async, we run: > > asyncio.run(callable()) > > instead of > > callable() > > We only need to check if the callable is sync or async (something that for > example fast_api does and we had recent problems with in a recent version > when they got it wrong). > > I think this is basically all we need to achieve what you wanted David. > Maybe (I am not sure) you also wanted to do looping somehow outside of the > `@task` callable, do something more sophisticated here, but IMHO just > explicitly allowing async task-decorated methods and making sure they run > in an event loop addresses exactly what you wanted to achieve. > > But maybe you thought about something else David - and can provide an > example how it would look like in Dag ? > > J. > > > > On Mon, Dec 15, 2025 at 6:09 PM Philippe Gagnon via dev < > [email protected]> wrote: > > > Hi, > > > > > There async tasks will benefit from the multi threading, as they share > > the same event loop and everything is run within the same Celery worker, > > but that's another solution. > > > > I'm not sure that's accurate; every task runs within its own process. > That > > being said, I think the ability to run async callables via PythonOperator > > (or any other operator for that matter) is valuable in and of itself, > and I > > don't think there would be much pushback if we adopted interfaces to > > facilitate this. WDYT about reducing the scope of your proposal (and PR) > to > > this framing, that way it would be easier to gain consensus? > > > > (I think that's essentially what you are pitching 🙂) > > > > BR, > > > > *✨ **Philippe Gagnon* > > *Meet for 30 mins 📅* <https://calendar.app.google/5qzgD9SadybUvSCv8> > > > > > > On Fri, Dec 5, 2025 at 2:49 AM Blain David <[email protected]> > > wrote: > > > > > Hello Jense, > > > > > > Thanks for your time and answer. I just granted you and Zhe-You access > > to > > > the document. > > > > > > In the article I explained why we did the iteration ourselves within > the > > > async @task decorated function, as this was way faster than doing it > with > > > dynamic task mapping. > > > Not that you cannot use dynamic task mapping with async PythonOperator, > > it > > > just works as with any other operator, it's just doesn't make sense as > it > > > won't give you any performance benefits due to the fact that you don't > > > share the same event loop (at least when using the CeleryExecutor). > > > > > > You could for example on big streams of data use dynamic tsk mapping to > > > chunk the stream in multiple pieces and then each task would process > the > > > chunk within the async operator for example, a bit like partition if > you > > > like. > > > > > > In the example I used for the article, we once again don't care about > > > individual download state of the FTP-file, we just want to know if the > > > directory was successfully downloaded or not, ofc we added some logging > > > statements to show which file was downloaded. > > > I also know Jarek wants individual state tracking, but that' not the > > > solution I presented here, for micro batching we have our > > IterableOperator, > > > which instead of doing partial/expand we do partial/iterate, which > > actually > > > does the same as the for loop of the example in the article but then > > > managed for > > > you in a multi threaded way for sync as async tasks as well. There > async > > > tasks will benefit from the multi threading, as they share the same > event > > > loop and everything is run within the same Celery worker, but that's > > > another solution. > > > > > > Still with the dynamic task mapping or IterableOperator, you wouldn't > be > > > able to use the SFTPClientPool (before name AsyncSFTPConnectionPool as > in > > > the article), so you wouldn't benefit of the performance gain you get > > from > > > the pool, that why here in this example, > > > we do the looping ourselves. > > > > > > And I completely agree for triggerers, we also use it a lot, and it is > > > indeed cool for long running tasks in which have lot's of waiting times > > > (dead time), and you're just monitoring a state, that the purpose of > > > triggerers! > > > But with some operators, triggers are misused as they are the "only" > way > > > to run async code which returns a lot of data which have to come back > to > > > the operator so it can be exposed as an XCom, there you'll see that you > > > trigger table in the Airflow database will explode fast, > > > As each yielded response is being stored in the database, as it can't > > make > > > use of a custom XCom backend like operators do, so in practise, each > > result > > > yielded by the trigger, will first end up in your tirgger table before > > > being stored as an XCom. > > > > > > Also, I'm not telling here to replace anything, I just propose an > > > alternative solution in Airflow so you're not purely tied to deferrable > > > operators. Also at the Summit, we experienced during a lot of > > > presentations, that people tend to use more the PythonOperators (or > > @task) > > > with hook than > > > using the operators themselves, which in my opinion makes sense as you > > > have more flexibility, you still benefit from the Airflow integration > > with > > > Hooks but you aren't tight to the operator implementation, which might > > > offer limited operations, look for example at the SFTPOperator and > you'll > > > understand immediately. That's why I propose to allow running async > code > > > in PythonOperators natively, that way, you can directly interact with > > async > > > hooks and you don't need a triggerer to do that. Triggers are great > for > > > polling and listening, but not for processing huge amounts of data, > > > that's where celery workers shine, thus allowing PythonOperators to > > > natively run async code in you celery workers, you can do so. > > > > > > For you last example, in our case DB calls are still sync, as of my > > > knowledge, we don’t have any DB hook based on DBApiHook which supports > > > async operations? Also the db operation can be seen as another sync > > task, > > > so they don't need to run in the same async task, you just pass the > XCom > > > returned from the async task to the sync task. But having async DB > hooks > > > could be cool, I also though about it, but this would also depend on > the > > > driver if it supports it, still it also something I would like to test > in > > > the near future. > > > > > > I hope this answered most of your questions Jens 😉 > > > > > > -----Original Message----- > > > From: Jens Scheffler <[email protected]> > > > Sent: 04 December 2025 21:12 > > > To: [email protected] > > > Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks > and > > > performance in Airflow 3 by supporting native async PythonOperator > > > > > > EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze > > > niet vertrouwt, klik niet op een link of open geen bijlages. Bij > twijfel, > > > stuur deze e-mail als bijlage naar [email protected]<mailto: > > > [email protected]>. > > > > > > Requested access to Google doc to read more details. Am interested and > > > also as Daniel what the difference is/would be. > > > > > > Especially also as progress tracking might be important. Yes, Task > > Mapping > > > is very expensive if you want to download 17k XML files, but also when > > > running Async and you are at 5000 files, if you resume would you know > > what > > > was complete or would it start from scratch all times? > > > > > > I think such micro-batching is cool but some state tracking is > important > > > - which might if it is in the DB also overload the DB or add very many > > > transactions. > > > > > > Trioggerer though I think still is cool for long running tasks where > you > > > just wait for response, e.g. you triggered another job remote or you > > > started a Pod that runs for an hour. We have Pods runnign for 10h > > sometimes > > > and then it is important to be able to roll new SW to workers and with > > > triggerers we cann de-couple this. > > > > > > So maybe - without missing details - I would judge such micro-batching > as > > > a third execution option but most probably would not replace the > others. > > > > > > Also knowin from own experience, writing async code is more complex and > > > error prone, so if you would request all normal code being async you > > might > > > scare users away. Proper review needed to ensure all IO is async (also > DB > > > calls!) > > > > > > On 12/4/25 18:08, Daniel Standish via dev wrote: > > > > Here's what I'm hearing from this > > > > > > > > 1. not using task mapping, but just looping instead, can be much more > > > > efficient. > > > > Yes, of course it can. > > > > > > > > 2. there are ways in which triggerer / deferrable operators are not > > > > fully complete, or do not fully have feature parity with regular > > > > operators (such as the custom xcom backend example) I believe it. > But > > > > this could certainly be worked on. > > > > > > > > Question for you: > > > > > > > > How is your proposal different / better than say, just calling > > > > `asyncio.run(...)` in a python task? > > > > > > > > > > > > On Thu, Dec 4, 2025 at 8:38 AM Blain David <[email protected]> > > > wrote: > > > > > > > >> As I already discussed with Jarek in the past but also with Hussein > > > >> during the Airflow Summit, we at a certain moment encountered > > > >> performance issues when using a lot of deferred operators. > > > >> > > > >> Allowing PythonOperators (and thus also @task decorated methods) to > > > >> natively execute async Python code in Airflow solved our performance > > > issues. > > > >> And yes, you could argue if that’s really necessary and also what’s > > > >> the added value? And at first you would indeed think it doesn’t make > > > >> sense at all do so, right? > > > >> But please bear with me first and hear me out first why we did it > > > >> that way and how it solved our performance issues and it will become > > > >> crystal clear 😉 > > > >> So below is the article I wrote, which is also publicly available > > > >> here< > > > >> https://doc/ > > > >> s.google.com > %2Fdocument%2Fd%2F1pNdQUB0gH-r2X1N_g774IOUEurowwQZ5OJ7yiY > > > >> 89qok&data=05%7C02%7Cdavid.blain%40infrabel.be > %7Ca90fcc33ab604f91948f > > > >> > 08de33719fc0%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C63900476028 > > > >> > 6869534%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuM > > > >> > DAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C& > > > >> sdata=%2FNbnXz%2BWTH0WDp8lTic8sraokWDojaNYfr51I2ohy58%3D&reserved=0> > > > >> on Google Docs which makes it easier to read than through the > > devlist. > > > >> > > > >> Here is my article: > > > >> > > > >> Rethinking deferrable operators, async hooks and performance in > > > >> Airflow 3 > > > >> > > > >> At our company, we strive to avoid custom code in Airflow as much as > > > >> possible to improve maintainability. > > > >> For years this meant favouring dedicated Airflow operators over > > > >> Python operators. > > > >> However, in Airflow 3, as the number of deferred operators in our > > > >> DAGs continued to grow, we began facing severe performance issues > > > >> with deferrable operators, which forced us to re-evaluate that > > approach. > > > >> > > > >> Initially we expected deferrable operators to improve performance > for > > > >> I/O-related tasks—such as REST API calls—because triggerers follow > an > > > >> async producer/consumer pattern. But in practice we discovered the > > > opposite. > > > >> > > > >> Why Deferrable Operators Became the Bottleneck? > > > >> > > > >> Deferrable operators and sensors delegate async work to triggerers. > > > >> This is perfectly fine for lightweight tasks such as polling or > > > >> waiting for messages on a queue. > > > >> > > > >> But in our case: > > > >> > > > >> > > > >> * MSGraphAsyncOperator performs long-running async operations. > > > >> * HttpOperator in deferrable mode can perform long-running HTTP > > > >> interactions, especially if pagination is involved. > > > >> * There is no native deferrable SFTPOperator, so if we want to > > use > > > the > > > >> SFTPHookAsync, we must use the PythonOperator which natively doesn’t > > > >> support async code (not that big of challenge). > > > >> * Both can return large payloads. > > > >> * Triggerers must store yielded events directly into the > Airflow > > > >> metadata database. > > > >> > > > >> Triggerers are not designed for sustained high-load async execution > > > >> or large data transfers. Unlike Celery workers, triggerers scale > > > >> poorly and quickly become the bottleneck. > > > >> > > > >> Yielded events from triggers are stored directly in the Airflow > > > >> metadata database because, unlike workers, triggers cannot leverage > a > > > >> custom XCom backend to offload large payloads, which can lead to > > > >> increased database load and potential performance bottlenecks. > > > >> > > > >> Dynamic task mapping with deferrable operators amplifies the problem > > > >> even further which AIP‑88 partially solves. > > > >> Triggerers also cannot be run on the Edge Executor as triggerers are > > > >> still tightly coupled with the Airflow metadata database (possibly > > > >> addressed in AIP‑92). > > > >> > > > >> Rethinking the approach: Async hooks + Python tasks > > > >> > > > >> These limitations led us to reconsider calling async hooks directly > > > >> from Python @task decorated functions or PythonOperators, thus > > > >> avoiding deferrable operators and thus triggerers entirely. > > > >> Operators are wrappers around hooks. Well‑written operators should > > > >> contain little logic and delegate all the work to the hooks which do > > > >> the real work,so why not call them directly? > > > >> This idea is also a bit in line with what Bolke already presented< > > > >> https://airflowsummit.org/slides/2023/ab1-1400-Operators.pdf> in > > 2023. > > > >> > > > >> Advantages of this approach include: > > > >> > > > >> > > > >> * No dynamic task mapping needed when iterating—just loop in > > > Python, > > > >> unless you really need to track each individual step but that comes > > > >> with a cost. > > > >> * Massive reduction in scheduler load. > > > >> * No triggerers involved. > > > >> * Async code can run on Edge Workers. > > > >> * Celery workers scale far much better than triggerers, so by > > > moving > > > >> from deferred operators and thus triggerers to async operators on > > > >> celery workers, our performance issues on the triggerer were gone > and > > > >> run times were much shorter probably because the trigger mechanism > > > >> also puts more load on the scheduler. > > > >> * Sync or async doesn’t make any difference in performance, > > unless > > > you > > > >> have to execute the same async function multiple times, that’s when > > > >> async shines compared to sync especially with I/O related > operations. > > > >> > > > >> Concrete Example: Async SFTP Downloads > > > >> > > > >> Below is an example comparing the download of ~17,000 XML-files and > > > >> storing into our Datawarehouse. > > > >> A single Celery worker can orchestrate many concurrent downloads > > > >> using asyncio. > > > >> A semaphore (here used internally by the AsyncSFTPConnectionPool) > > > >> protects the SFTP server from being overloaded. > > > >> Benchmark results: > > > >> > > > >> Approach > > > >> Environment Time > > > >> Mapped SFTPOperator > > > >> production 3h 25m 55s > > > >> PythonOperator + SFTPHook > > > >> local laptop 1h 21m 09s > > > >> Async Python task + SFTPHookAsync (without pool) local laptop > > > >> 8m 29s > > > >> Async Python task + AsyncSFTPConnectionPool production > > > >> 3m32s > > > >> > > > >> As you all can conclude, DagRun time went down from more than 3 > hours > > > >> to only 3 minutes and a half, which is huge! > > > >> > > > >> In the google docs there are 2 different code snippets on how it’s > > > >> done sync and async which I will not put here. > > > >> > > > >> Conclusion > > > >> > > > >> Using async hooks inside async Python tasks provides better > > > >> performance, scalability, and flexibility, and avoids reliance on > > > triggerers entirely. > > > >> This hybrid approach—'async where it matters, operators where they > > > >> make sense'—may represent the future of high‑performance Airflow > data > > > >> processing workloads. > > > >> > > > >> What did I change in Airflow? > > > >> > > > >> Not that much, I only: > > > >> > > > >> > > > >> * Introduced an async PythonOperator so you don’t have to > handle > > > the > > > >> event loop yourself, not that special, but also natively supported > on > > > >> async @task decorated python methods, which is nice to read. > > > >> * Did some improvements on the SFTPHookAsync to fully take > > > advantage > > > >> of the async. > > > >> * Introduced a SFTPHookPool so multiple asyncio tasks can > re-use > > > >> connection instance to gain even more performance, in this case it > > > >> meant a reduction of 5 minutes in processing time, so we went from 8 > > to > > > 3 minutes. > > > >> > > > >> > > > >> > > > > > > --------------------------------------------------------------------- > > > To unsubscribe, e-mail: [email protected] > > > For additional commands, e-mail: [email protected] > > > > > > > > >
