This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 53085cd1d07 fix: Unhandled Exception in remote logging if connection
doesn't exist(#59801) (#62979)
53085cd1d07 is described below
commit 53085cd1d07332f30bf823daaa2592e8e2e59cc4
Author: Dev-iL <[email protected]>
AuthorDate: Fri Mar 6 09:38:51 2026 +0200
fix: Unhandled Exception in remote logging if connection doesn't
exist(#59801) (#62979)
Cherry-picked from 3428dc9 with conflict resolution:
- context.py: Added `import inspect` (skip `import functools` as `from
functools import cache` already exists)
- supervisor.py: Adopted early-return pattern and explicit `del` for GC,
kept simpler env var handling (no `_AIRFLOW_PROCESS_CONTEXT` which doesn't
exist in v3-1-test)
- test_supervisor.py: Replaced `@pytest.mark.xfail` workaround with proper
`use_real_secrets_backends` fixture
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
task-sdk/src/airflow/sdk/api/client.py | 3 +-
task-sdk/src/airflow/sdk/execution_time/context.py | 8 +++--
.../src/airflow/sdk/execution_time/supervisor.py | 37 ++++++++++++----------
.../tests/task_sdk/execution_time/test_secrets.py | 22 ++++++++++++-
.../task_sdk/execution_time/test_supervisor.py | 31 ++++++++++++++++--
5 files changed, 79 insertions(+), 22 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index 2ef50a72ff7..8d9e48d6129 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -848,7 +848,8 @@ class Client(httpx.Client):
kwargs.setdefault("base_url", "dry-run://server")
else:
kwargs["base_url"] = base_url
- kwargs["verify"] = self._get_ssl_context_cached(certifi.where(),
API_SSL_CERT_PATH)
+ # Call via the class to avoid binding lru_cache wires to this
instance.
+ kwargs["verify"] =
type(self)._get_ssl_context_cached(certifi.where(), API_SSL_CERT_PATH)
# Set timeout if not explicitly provided
kwargs.setdefault("timeout", API_TIMEOUT)
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 379a13a930e..512468bb486 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import collections
import contextlib
+import inspect
from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence
from functools import cache
from typing import TYPE_CHECKING, Any, Generic, TypeVar, overload
@@ -197,8 +198,11 @@ async def _async_get_connection(conn_id: str) ->
Connection:
for secrets_backend in backends:
try:
# Use async method if available, otherwise wrap sync method
- if hasattr(secrets_backend, "aget_connection"):
- conn = await secrets_backend.aget_connection(conn_id) # type:
ignore[assignment]
+ # getattr avoids triggering AsyncMock coroutine creation under
Python 3.13
+ async_method = getattr(secrets_backend, "aget_connection", None)
+ if async_method is not None:
+ maybe_awaitable = async_method(conn_id)
+ conn = await maybe_awaitable if
inspect.isawaitable(maybe_awaitable) else maybe_awaitable
else:
conn = await
sync_to_async(secrets_backend.get_connection)(conn_id) # type:
ignore[assignment]
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 206bf3b6878..0b02d215f27 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -36,14 +36,7 @@ from contextlib import contextmanager, suppress
from datetime import datetime, timezone
from http import HTTPStatus
from socket import socket, socketpair
-from typing import (
- TYPE_CHECKING,
- BinaryIO,
- ClassVar,
- NoReturn,
- TextIO,
- cast,
-)
+from typing import TYPE_CHECKING, BinaryIO, ClassVar, NoReturn, TextIO, cast
from urllib.parse import urlparse
from uuid import UUID
@@ -898,17 +891,29 @@ def _remote_logging_conn(client: Client):
# Fetch connection details on-demand without caching the entire API client
instance
conn = _fetch_remote_logging_conn(conn_id, client)
- if conn:
- key = f"AIRFLOW_CONN_{conn_id.upper()}"
- old = os.getenv(key)
- os.environ[key] = conn.get_uri()
+ if not conn:
try:
yield
finally:
- if old is None:
- del os.environ[key]
- else:
- os.environ[key] = old
+ # Ensure we don't leak the caller's client when no connection was
fetched.
+ del conn
+ del client
+ return
+
+ key = f"AIRFLOW_CONN_{conn_id.upper()}"
+ old = os.getenv(key)
+ os.environ[key] = conn.get_uri()
+ try:
+ yield
+ finally:
+ if old is None:
+ del os.environ[key]
+ else:
+ os.environ[key] = old
+
+ # Explicitly drop local references so the caller's client can be
garbage collected.
+ del conn
+ del client
@attrs.define(kw_only=True)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py
b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
index b19ab3b1003..8f9745b0ffe 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_secrets.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
@@ -144,9 +144,29 @@ class TestExecutionAPISecretsBackend:
# Mock the greenback and asyncio modules that are imported inside the
exception handler
mocker.patch("greenback.has_portal", return_value=True)
- mock_greenback_await = mocker.patch("greenback.await_",
return_value=expected_conn)
mocker.patch("asyncio.current_task")
+ # Mock greenback.await_ to actually await the coroutine it receives.
+ # This prevents Python 3.13 RuntimeWarning about unawaited coroutines.
+ import asyncio
+
+ def greenback_await_side_effect(coro):
+ loop = asyncio.new_event_loop()
+ try:
+ return loop.run_until_complete(coro)
+ finally:
+ loop.close()
+
+ mock_greenback_await = mocker.patch("greenback.await_",
side_effect=greenback_await_side_effect)
+
+ # Mock aget_connection to return the expected connection directly.
+ # We need to mock this because the real aget_connection would try to
+ # use SUPERVISOR_COMMS.asend which is not set up for this test.
+ async def mock_aget_connection(self, conn_id):
+ return expected_conn
+
+ mocker.patch.object(ExecutionAPISecretsBackend, "aget_connection",
mock_aget_connection)
+
backend = ExecutionAPISecretsBackend()
conn = backend.get_connection("databricks_default")
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index ba3c5166931..c7b54666a1e 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -225,8 +225,35 @@ class TestWatchedSubprocess:
def disable_log_upload(self, spy_agency):
spy_agency.spy_on(ActivitySubprocess._upload_logs, call_original=False)
- # TODO: Investigate and fix it after 3.1.0
- @pytest.mark.xfail(reason="Fails on Py 3.12 with multi-threading error
only in tests.")
+ @pytest.fixture(autouse=True)
+ def use_real_secrets_backends(self, monkeypatch):
+ """
+ Ensure that real secrets backend instances are used instead of mocks.
+
+ This prevents Python 3.13 RuntimeWarning when hasattr checks async
methods
+ on mocked backends. The warning occurs because hasattr on AsyncMock
creates
+ unawaited coroutines.
+
+ This fixture ensures test isolation when running in parallel with
pytest-xdist,
+ regardless of what other tests patch.
+ """
+ import importlib
+
+ import airflow.sdk.execution_time.secrets.execution_api as
execution_api_module
+ from airflow.secrets.environment_variables import
EnvironmentVariablesBackend
+
+ fresh_execution_backend =
importlib.reload(execution_api_module).ExecutionAPISecretsBackend
+
+ # Ensure downstream imports see the restored class instead of any
AsyncMock left by other tests
+ import airflow.sdk.execution_time.secrets as secrets_package
+
+ monkeypatch.setattr(secrets_package, "ExecutionAPISecretsBackend",
fresh_execution_backend)
+
+ monkeypatch.setattr(
+
"airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded",
+ lambda: [EnvironmentVariablesBackend(), fresh_execution_backend()],
+ )
+
def test_reading_from_pipes(self, captured_logs, time_machine,
client_with_ti_start):
def subprocess_main():
# This is run in the subprocess!