SameerMesiah97 commented on code in PR #63035:
URL: https://github.com/apache/airflow/pull/63035#discussion_r2936018521


##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -202,90 +212,144 @@ def sync(self):
                     "AWS credentials are either missing or expired: 
%s.\nRetrying connection", error
                 )
         except Exception:
-            self.log.exception("An error occurred while syncing tasks")
+            self.log.exception("An error occurred while syncing workloads.")
 
     def queue_workload(self, workload: workloads.All, session: Session | None) 
-> None:
         from airflow.executors import workloads
 
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+            return
+
+        if AIRFLOW_V_3_2_PLUS and isinstance(workload, 
workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+            return
+
+        raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
+
+    def _process_workloads(self, workload_items: Sequence[workloads.All]) -> 
None:
+        from airflow.executors import workloads
+

Review Comment:
   @ferruzzi
   
   Can you be more specific about what you are agreeing on with Niko? I am not 
sure what action is required from me on thos comment besides changing the 
variable name?



##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -202,90 +212,144 @@ def sync(self):
                     "AWS credentials are either missing or expired: 
%s.\nRetrying connection", error
                 )
         except Exception:
-            self.log.exception("An error occurred while syncing tasks")
+            self.log.exception("An error occurred while syncing workloads.")
 
     def queue_workload(self, workload: workloads.All, session: Session | None) 
-> None:
         from airflow.executors import workloads
 
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+            return
+
+        if AIRFLOW_V_3_2_PLUS and isinstance(workload, 
workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+            return
+
+        raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
+
+    def _process_workloads(self, workload_items: Sequence[workloads.All]) -> 
None:
+        from airflow.executors import workloads
+

Review Comment:
   @ferruzzi
   
   Can you be more specific about what you are agreeing on with Niko? I am not 
sure what action is required from me on this comment besides changing the 
variable name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to