jason810496 commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r2883213524
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3243,6 +3253,92 @@ def _activate_assets_generate_warnings() ->
Iterator[tuple[str, str]]:
session.add(warning)
existing_warned_dag_ids.add(warning.dag_id)
+ @provide_session
+ def _dispatch_connection_tests(self, *, session: Session = NEW_SESSION) ->
None:
+ """Dispatch pending connection tests to executors that support them."""
+ max_concurrency = conf.getint("core",
"max_connection_test_concurrency", fallback=4)
+ timeout = conf.getint("core", "connection_test_timeout", fallback=60)
+
+ active_count = session.scalar(
+ select(func.count(ConnectionTest.id)).where(
+ ConnectionTest.state.in_([ConnectionTestState.QUEUED,
ConnectionTestState.RUNNING])
+ )
+ )
+ budget = max_concurrency - (active_count or 0)
+ if budget <= 0:
+ return
+
+ pending_stmt = (
+ select(ConnectionTest)
+ .where(ConnectionTest.state == ConnectionTestState.PENDING)
+ .order_by(ConnectionTest.created_at)
+ .limit(budget)
+ )
+ pending_stmt = with_row_locks(pending_stmt, session,
of=ConnectionTest, skip_locked=True)
+ pending_tests = session.scalars(pending_stmt).all()
+
+ if not pending_tests:
+ return
+
+ for ct in pending_tests:
+ executor = self._find_executor_for_connection_test(ct.executor)
+ if executor is None:
+ reason = (
+ f"No executor matches '{ct.executor}'"
+ if ct.executor
+ else "No executor supports connection testing"
+ )
+ ct.state = ConnectionTestState.FAILED
+ ct.result_message = reason
+ self.log.warning("Failing connection test %s: %s", ct.id,
reason)
+ continue
+
+ workload = workloads.TestConnection.make(
+ connection_test_id=ct.id,
+ connection_id=ct.connection_id,
+ timeout=timeout,
+ generator=executor.jwt_generator,
+ )
+ executor.queue_workload(workload, session=session)
+ ct.state = ConnectionTestState.QUEUED
+
+ session.flush()
+
+ @provide_session
+ def _reap_stale_connection_tests(self, *, session: Session = NEW_SESSION)
-> None:
+ """Mark connection tests that have exceeded their timeout as FAILED."""
+ timeout = conf.getint("core", "connection_test_timeout", fallback=60)
+ grace_period = max(30, timeout // 2)
+ cutoff = timezone.utcnow() - timedelta(seconds=timeout + grace_period)
+
+ stale_stmt = select(ConnectionTest).where(
+ ConnectionTest.state.in_([ConnectionTestState.QUEUED,
ConnectionTestState.RUNNING]),
+ ConnectionTest.updated_at < cutoff,
+ )
+ stale_stmt = with_row_locks(stale_stmt, session, of=ConnectionTest,
skip_locked=True)
+ stale_tests = session.scalars(stale_stmt).all()
+
+ for ct in stale_tests:
+ ct.state = ConnectionTestState.FAILED
+ ct.result_message = f"Connection test timed out (exceeded
{timeout}s + {grace_period}s grace)"
+ self.log.warning("Reaped stale connection test %s", ct.id)
+ if ct.connection_snapshot:
+ attempt_revert(ct, session=session)
+
+ session.flush()
+
+ def _find_executor_for_connection_test(self, executor_name: str | None) ->
BaseExecutor | None:
+ """Find an executor that supports connection testing, optionally
matching by team name."""
+ if executor_name is not None:
+ for executor in self.executors:
+ if executor.supports_connection_test and executor.team_name ==
executor_name:
+ return executor
+ return None
+ for executor in self.executors:
+ if executor.supports_connection_test:
+ return executor
+ return None
Review Comment:
The main problems pointed out in last review doesn't seem to be resolved yet.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -168,6 +177,84 @@ def _execute_callback(log: Logger, workload:
workloads.ExecuteCallback, team_con
raise RuntimeError(error_msg or "Callback execution failed")
+def _execute_connection_test(log: Logger, workload: workloads.TestConnection,
team_conf) -> None:
Review Comment:
Additionally, it would be nice to verify the e2e flow with real system setup
(perhaps via Breeze) when you have time, thanks!
##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -314,10 +324,20 @@ def heartbeat(self) -> None:
self._emit_metrics(open_slots, num_running_workloads,
num_queued_workloads)
self.trigger_tasks(open_slots)
+ if self.supports_connection_test and self.queued_connection_tests:
Review Comment:
Could we move the check into `trigger_connection_tests` function to
encapsulate the logic?
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3243,6 +3253,92 @@ def _activate_assets_generate_warnings() ->
Iterator[tuple[str, str]]:
session.add(warning)
existing_warned_dag_ids.add(warning.dag_id)
+ @provide_session
+ def _dispatch_connection_tests(self, *, session: Session = NEW_SESSION) ->
None:
+ """Dispatch pending connection tests to executors that support them."""
+ max_concurrency = conf.getint("core",
"max_connection_test_concurrency", fallback=4)
+ timeout = conf.getint("core", "connection_test_timeout", fallback=60)
+
+ active_count = session.scalar(
+ select(func.count(ConnectionTest.id)).where(
+ ConnectionTest.state.in_([ConnectionTestState.QUEUED,
ConnectionTestState.RUNNING])
Review Comment:
How about defining as frozenset in `models.connection_test`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]