This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 53085cd1d07 fix: Unhandled Exception in remote logging if connection 
doesn't exist(#59801) (#62979)
53085cd1d07 is described below

commit 53085cd1d07332f30bf823daaa2592e8e2e59cc4
Author: Dev-iL <[email protected]>
AuthorDate: Fri Mar 6 09:38:51 2026 +0200

    fix: Unhandled Exception in remote logging if connection doesn't 
exist(#59801) (#62979)
    
    Cherry-picked from 3428dc9 with conflict resolution:
    - context.py: Added `import inspect` (skip `import functools` as `from 
functools import cache` already exists)
    - supervisor.py: Adopted early-return pattern and explicit `del` for GC, 
kept simpler env var handling (no `_AIRFLOW_PROCESS_CONTEXT` which doesn't 
exist in v3-1-test)
    - test_supervisor.py: Replaced `@pytest.mark.xfail` workaround with proper 
`use_real_secrets_backends` fixture
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 task-sdk/src/airflow/sdk/api/client.py             |  3 +-
 task-sdk/src/airflow/sdk/execution_time/context.py |  8 +++--
 .../src/airflow/sdk/execution_time/supervisor.py   | 37 ++++++++++++----------
 .../tests/task_sdk/execution_time/test_secrets.py  | 22 ++++++++++++-
 .../task_sdk/execution_time/test_supervisor.py     | 31 ++++++++++++++++--
 5 files changed, 79 insertions(+), 22 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index 2ef50a72ff7..8d9e48d6129 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -848,7 +848,8 @@ class Client(httpx.Client):
             kwargs.setdefault("base_url", "dry-run://server")
         else:
             kwargs["base_url"] = base_url
-            kwargs["verify"] = self._get_ssl_context_cached(certifi.where(), 
API_SSL_CERT_PATH)
+            # Call via the class to avoid binding lru_cache wires to this 
instance.
+            kwargs["verify"] = 
type(self)._get_ssl_context_cached(certifi.where(), API_SSL_CERT_PATH)
 
         # Set timeout if not explicitly provided
         kwargs.setdefault("timeout", API_TIMEOUT)
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py 
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 379a13a930e..512468bb486 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -18,6 +18,7 @@ from __future__ import annotations
 
 import collections
 import contextlib
+import inspect
 from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence
 from functools import cache
 from typing import TYPE_CHECKING, Any, Generic, TypeVar, overload
@@ -197,8 +198,11 @@ async def _async_get_connection(conn_id: str) -> 
Connection:
     for secrets_backend in backends:
         try:
             # Use async method if available, otherwise wrap sync method
-            if hasattr(secrets_backend, "aget_connection"):
-                conn = await secrets_backend.aget_connection(conn_id)  # type: 
ignore[assignment]
+            # getattr avoids triggering AsyncMock coroutine creation under 
Python 3.13
+            async_method = getattr(secrets_backend, "aget_connection", None)
+            if async_method is not None:
+                maybe_awaitable = async_method(conn_id)
+                conn = await maybe_awaitable if 
inspect.isawaitable(maybe_awaitable) else maybe_awaitable
             else:
                 conn = await 
sync_to_async(secrets_backend.get_connection)(conn_id)  # type: 
ignore[assignment]
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 206bf3b6878..0b02d215f27 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -36,14 +36,7 @@ from contextlib import contextmanager, suppress
 from datetime import datetime, timezone
 from http import HTTPStatus
 from socket import socket, socketpair
-from typing import (
-    TYPE_CHECKING,
-    BinaryIO,
-    ClassVar,
-    NoReturn,
-    TextIO,
-    cast,
-)
+from typing import TYPE_CHECKING, BinaryIO, ClassVar, NoReturn, TextIO, cast
 from urllib.parse import urlparse
 from uuid import UUID
 
@@ -898,17 +891,29 @@ def _remote_logging_conn(client: Client):
     # Fetch connection details on-demand without caching the entire API client 
instance
     conn = _fetch_remote_logging_conn(conn_id, client)
 
-    if conn:
-        key = f"AIRFLOW_CONN_{conn_id.upper()}"
-        old = os.getenv(key)
-        os.environ[key] = conn.get_uri()
+    if not conn:
         try:
             yield
         finally:
-            if old is None:
-                del os.environ[key]
-            else:
-                os.environ[key] = old
+            # Ensure we don't leak the caller's client when no connection was 
fetched.
+            del conn
+            del client
+        return
+
+    key = f"AIRFLOW_CONN_{conn_id.upper()}"
+    old = os.getenv(key)
+    os.environ[key] = conn.get_uri()
+    try:
+        yield
+    finally:
+        if old is None:
+            del os.environ[key]
+        else:
+            os.environ[key] = old
+
+        # Explicitly drop local references so the caller's client can be 
garbage collected.
+        del conn
+        del client
 
 
 @attrs.define(kw_only=True)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py 
b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
index b19ab3b1003..8f9745b0ffe 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_secrets.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py
@@ -144,9 +144,29 @@ class TestExecutionAPISecretsBackend:
 
         # Mock the greenback and asyncio modules that are imported inside the 
exception handler
         mocker.patch("greenback.has_portal", return_value=True)
-        mock_greenback_await = mocker.patch("greenback.await_", 
return_value=expected_conn)
         mocker.patch("asyncio.current_task")
 
+        # Mock greenback.await_ to actually await the coroutine it receives.
+        # This prevents Python 3.13 RuntimeWarning about unawaited coroutines.
+        import asyncio
+
+        def greenback_await_side_effect(coro):
+            loop = asyncio.new_event_loop()
+            try:
+                return loop.run_until_complete(coro)
+            finally:
+                loop.close()
+
+        mock_greenback_await = mocker.patch("greenback.await_", 
side_effect=greenback_await_side_effect)
+
+        # Mock aget_connection to return the expected connection directly.
+        # We need to mock this because the real aget_connection would try to
+        # use SUPERVISOR_COMMS.asend which is not set up for this test.
+        async def mock_aget_connection(self, conn_id):
+            return expected_conn
+
+        mocker.patch.object(ExecutionAPISecretsBackend, "aget_connection", 
mock_aget_connection)
+
         backend = ExecutionAPISecretsBackend()
         conn = backend.get_connection("databricks_default")
 
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index ba3c5166931..c7b54666a1e 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -225,8 +225,35 @@ class TestWatchedSubprocess:
     def disable_log_upload(self, spy_agency):
         spy_agency.spy_on(ActivitySubprocess._upload_logs, call_original=False)
 
-    # TODO: Investigate and fix it after 3.1.0
-    @pytest.mark.xfail(reason="Fails on Py 3.12 with multi-threading error 
only in tests.")
+    @pytest.fixture(autouse=True)
+    def use_real_secrets_backends(self, monkeypatch):
+        """
+        Ensure that real secrets backend instances are used instead of mocks.
+
+        This prevents Python 3.13 RuntimeWarning when hasattr checks async 
methods
+        on mocked backends. The warning occurs because hasattr on AsyncMock 
creates
+        unawaited coroutines.
+
+        This fixture ensures test isolation when running in parallel with 
pytest-xdist,
+        regardless of what other tests patch.
+        """
+        import importlib
+
+        import airflow.sdk.execution_time.secrets.execution_api as 
execution_api_module
+        from airflow.secrets.environment_variables import 
EnvironmentVariablesBackend
+
+        fresh_execution_backend = 
importlib.reload(execution_api_module).ExecutionAPISecretsBackend
+
+        # Ensure downstream imports see the restored class instead of any 
AsyncMock left by other tests
+        import airflow.sdk.execution_time.secrets as secrets_package
+
+        monkeypatch.setattr(secrets_package, "ExecutionAPISecretsBackend", 
fresh_execution_backend)
+
+        monkeypatch.setattr(
+            
"airflow.sdk.execution_time.supervisor.ensure_secrets_backend_loaded",
+            lambda: [EnvironmentVariablesBackend(), fresh_execution_backend()],
+        )
+
     def test_reading_from_pipes(self, captured_logs, time_machine, 
client_with_ti_start):
         def subprocess_main():
             # This is run in the subprocess!

Reply via email to