ferruzzi commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r2893084458
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -240,10 +242,18 @@ def queue_workload(self, workload: workloads.All,
session: Session) -> None:
f"See LocalExecutor or CeleryExecutor for reference
implementation."
)
self.queued_callbacks[workload.callback.id] = workload
+ elif isinstance(workload, workloads.TestConnection):
+ if not self.supports_connection_test:
+ raise NotImplementedError(
+ f"{type(self).__name__} does not support TestConnection
workloads. "
+ f"Set supports_connection_test = True and implement
connection test handling "
+ f"in _process_workloads(). See LocalExecutor for reference
implementation."
+ )
+ self.queued_connection_tests[str(workload.connection_test_id)] =
workload
else:
raise ValueError(
f"Un-handled workload type {type(workload).__name__!r} in
{type(self).__name__}. "
- f"Workload must be one of: ExecuteTask, ExecuteCallback."
+ f"Workload must be one of: ExecuteTask, ExecuteCallback,
TestConnection."
Review Comment:
Non-blocking future work unless you are already making more changes here.
We'll eventually want to generate this list by unpacking ExecutorWorkload or
something instead of manually adding new workload types I guess.
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -573,13 +592,24 @@ def try_adopt_task_instances(self, tis:
Sequence[TaskInstance]) -> Sequence[Task
@property
def slots_available(self):
- """Number of new workloads (tasks and callbacks) this executor
instance can accept."""
- return self.parallelism - len(self.running) - len(self.queued_tasks) -
len(self.queued_callbacks)
+ """Number of new workloads (tasks, callbacks, and connection tests)
this executor instance can accept."""
+ return (
+ self.parallelism
+ - len(self.running)
+ - len(self.queued_tasks)
+ - len(self.queued_callbacks)
+ - len(self.queued_connection_tests)
+ )
Review Comment:
Non-blocking future work idea: I considered this when i was in here adding
the callback workload but thought it wasn't worth the refactor at the time;
maybe we're getting close to that time. I wonder if we should consider (again,
not now) an alternative data object for the queues.
If we did something like `executor_queue: dict[workload_type,
workload_queue_type]` and add the `workload_queue_type` field and type hint to
each workload the same as the `type` field, then we could do
`executor_queue[workload.type].update(new_entry)` and these slots_available and
slots_occupied get simplified down to iterating the executor_queue and summing
the lengths.
Anyway, like I said, absolutely not blocking but I wanted to get the idea
out since it was starting to look relevant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]