ferruzzi commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r2897389432


##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -573,13 +592,24 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
 
     @property
     def slots_available(self):
-        """Number of new workloads (tasks and callbacks) this executor 
instance can accept."""
-        return self.parallelism - len(self.running) - len(self.queued_tasks) - 
len(self.queued_callbacks)
+        """Number of new workloads (tasks, callbacks, and connection tests) 
this executor instance can accept."""
+        return (
+            self.parallelism
+            - len(self.running)
+            - len(self.queued_tasks)
+            - len(self.queued_callbacks)
+            - len(self.queued_connection_tests)
+        )
 

Review Comment:
   Maybe the queue_type should also include something to indicate queue 
priority and whether that queue is intended to be used FIFO or prioritized? 
   
   Something vaguely like 
   
   If we add a NamedTuple called `WorkloadQueueDef(scheduling_tier: int, 
sort_key: int)` into the base workload and they are defined like this in the 
individual workloads:
   
   ```python
   class TestConnection:
       @property
       def queue_def(self) -> WorkloadQueueDef :
           return WorkloadQueueDef(
               scheduling_tier = -1,  # connection tests are super fast so 
let's always get them out of the way first
               sort_key = 0,  # FIFO order
           )    
   
   class ExecuteCallback:
       @property
        def queue_def(self) -> WorkloadQueueDef:
           return WorkloadQueueDef(
               scheduling_tier = 0,  # always finish a callback before starting 
a new task, but don't block the much faster fast connection tests for a 
callback 
               sort_key = 0,  # FIFO order
           )
   
   class ExecuteTask:
       @property
       def queue_def(self) -> WorkloadQueueDef:
           return WorkloadQueueDef(
               # after all connection tests and callbacks are running, take the 
task with the highest priority
               scheduling_tier = 1,
               sort_key = self.ti.priority_weight,
           )
   ```        
   
   ((IN fact, we can set them to both default to 0 and that gets even more 
simplified.... but you get the idea))
   
   and replace all of the "supports_callbacks", "supports_connection_test", etc 
in each executor can be replaced by a `supported_workload_types: set`.  in 
LocalExecutr it might look like `supported_workload_types = {"ExecuteTask", 
"ExecuteCallback", "TestConnection"}`
   
   then the entire chunk of code in _get_workloads_to_schedule that sorts out 
which jobs to pick up would be something real simple like 
   
   ```python
   def _get_workloads_to_schedule(self, open_slots: int) -> 
list[tuple[WorkloadKey, ExecutorWorkload]]:
       # create a meta_queue which consists of each sub-queue, sorted by that 
sub-queue's defined sort_key in the order of the scheduling tiers. Then take 
the top `open_slots` number of jobs off the list.
       all_workloads: list[ExecutorWorkload] = [
           workload
           for queue in self.executor_queues.values()
           for workload in queue.values()
       ]
       # Sort by scheduling tier first, then by sort_key within each tier.
       all_workloads.sort(key=lambda workload: workload.queue_def)
       
       return [(workload.key, workload) for workload in 
all_workloads[:open_slots]]
   ```
   
   As a bonus, this also assures that future workload types honor slots by 
default; if some future workload type wanted to break that, it would have to be 
done intentionally.  It also means `slots_available` and `slots_occupied` turn 
into simple comprehensions along the lines of `sum(len(q) for q in 
self.executor_queues.values())` that don't have to be updated with every new 
workload.  The code overall gets much cleaner.
   
    I'm not sure, I haven't thought that through fully, but as we add more 
workload types we may need to get that sorted out before long.  I have some 
rough ideas but they were all overkill for only having two types at the time so 
i figured this was a good place to get those thoughts "on paper".  Feel free to 
resolve this  comment once you have seen it so nobody sees it as blocking, but 
feel free to message me on Slack and/or tag me in the PR if you get to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to