ferruzzi commented on code in PR #63035:
URL: https://github.com/apache/airflow/pull/63035#discussion_r2943205548


##########
providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py:
##########
@@ -202,90 +212,144 @@ def sync(self):
                     "AWS credentials are either missing or expired: 
%s.\nRetrying connection", error
                 )
         except Exception:
-            self.log.exception("An error occurred while syncing tasks")
+            self.log.exception("An error occurred while syncing workloads.")
 
     def queue_workload(self, workload: workloads.All, session: Session | None) 
-> None:
         from airflow.executors import workloads
 
-        if not isinstance(workload, workloads.ExecuteTask):
-            raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
-        ti = workload.ti
-        self.queued_tasks[ti.key] = workload
+        if isinstance(workload, workloads.ExecuteTask):
+            ti = workload.ti
+            self.queued_tasks[ti.key] = workload
+            return
+
+        if AIRFLOW_V_3_2_PLUS and isinstance(workload, 
workloads.ExecuteCallback):
+            self.queued_callbacks[workload.callback.id] = workload
+            return
+
+        raise RuntimeError(f"{type(self)} cannot handle workloads of type 
{type(workload)}")
+
+    def _process_workloads(self, workload_items: Sequence[workloads.All]) -> 
None:
+        from airflow.executors import workloads
+

Review Comment:
   Sorry I wasn't clear, the variable name was what I was referring to.  I see 
that at least some of it was existing code, but we should clean that up, not 
just fix your new changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to