amoghrajesh commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r2993314207
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2120,3 +2120,99 @@ def supervise(
if close_client and client:
with suppress(Exception):
client.close()
+
+
+def supervise_workload(
+ workload: ExecutorWorkload,
+ *,
+ server: str | None = None,
+ dry_run: bool = False,
+ client: Client | None = None,
+ subprocess_logs_to_stdout: bool = False,
+ proctitle: str | None = None,
+) -> int:
+ """
+ Run any workload type to completion in a supervised subprocess.
+
+ Dispatch to the appropriate supervisor based on workload type.
Workload-specific
+ attributes (log_path, sentry_integration, bundle_info, etc.) are read from
the
+ workload object itself.
+
+ :param workload: The ``ExecutorWorkload`` to execute.
+ :param server: Base URL of the API server (used by task workloads).
+ :param dry_run: If True, execute without actual task execution (simulate
run).
+ :param client: Optional preconfigured client for communication with the
server.
+ :param subprocess_logs_to_stdout: Should task logs also be sent to stdout
via the main logger.
+ :param proctitle: Process title to set for this workload. If not provided,
defaults to
+ ``"airflow supervisor: <workload.display_name>"``. Executors may pass
a custom title
+ that includes executor-specific context (e.g. team name).
+ :return: Exit code of the process.
+ """
+ # Imports deferred to avoid an SDK/core dependency at module load time.
+ from airflow.executors.workloads.callback import ExecuteCallback
+ from airflow.executors.workloads.task import ExecuteTask
Review Comment:
You don't have to do it right now, but I think moving the `workload`
definitions to a shared library is a good idea?
Executors can be in core (LocalExecutor) / providers mostly and we could
advance on the client server separation if we move this to shared library?
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2000,7 +1999,7 @@ def _configure_logging(log_path: str, client: Client) ->
tuple[FilteringBoundLog
return logger, log_file_descriptor
-def supervise(
+def supervise_task(
Review Comment:
Hmm, feel free to do as you deem right here, but we should avoid issuing
deprecation warning in our code where we can prevent it, my 2c
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]