dabla commented on code in PR #62922:
URL: https://github.com/apache/airflow/pull/62922#discussion_r2964890237


##########
task-sdk/src/airflow/sdk/bases/operator.py:
##########
@@ -1731,6 +1747,50 @@ def execute(self, context):
             return loop.run_until_complete(self.aexecute(context))
 
 
+class DecoratedDeferredAsyncOperator(BaseAsyncOperator):
+    """
+    A decorator operator that wraps another deferred BaseOperator instance.
+
+    Implements the async aexecute() method while delegating all other behavior.
+    """
+
+    def __init__(self, *, operator: BaseOperator, task_deferred: TaskDeferred, 
**kwargs: Any):
+        super().__init__(task_id=operator.task_id, **kwargs)
+        self._operator = operator
+        self._task_deferred = task_deferred
+
+    async def aexecute(self, context):
+        from airflow.sdk.execution_time.callback_runner import 
create_executable_runner
+        from airflow.sdk.execution_time.context import 
context_get_outlet_events
+
+        event = await run_trigger(self._task_deferred.trigger)
+
+        self.log.debug("event: %s", event)
+
+        if event:
+            self.log.debug("next_method: %s", self._task_deferred.method_name)
+
+            if self._task_deferred.method_name:
+                try:
+                    next_method = self._operator.next_callable(
+                        self._task_deferred.method_name,
+                        self._task_deferred.kwargs,
+                    )
+
+                    outlet_events = context_get_outlet_events(context)
+                    runner = create_executable_runner(
+                        func=next_method,
+                        outlet_events=outlet_events,
+                        logger=self.log,
+                    )
+                    return runner.run(context, event.payload)
+                except TaskDeferred as task_deferred:
+                    self._task_deferred = task_deferred
+                    # Recursively handle nested deferrals
+                    return await self.aexecute(context=context)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to