This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b925b31106 Add bounded best-effort cluster deletion when 
PermissionDenied occurs after cluster creation has been initiated in 
non-deferrable mode. Deletion is triggered with wait_to_complete=False and 
retried on FailedPrecondition until cleanup_timeout_seconds is reached, and the 
original exception is always re-raised. Add unit tests covering cleanup 
initiation, retry behavior, and error propagation. (#62302)
8b925b31106 is described below

commit 8b925b31106a94792dd9fd7a587070141ac9e8a5
Author: SameerMesiah97 <[email protected]>
AuthorDate: Tue Mar 3 20:02:45 2026 +0000

    Add bounded best-effort cluster deletion when PermissionDenied occurs after 
cluster creation has been initiated in non-deferrable mode. Deletion is 
triggered with wait_to_complete=False and retried on FailedPrecondition until 
cleanup_timeout_seconds is reached, and the original exception is always 
re-raised. Add unit tests covering cleanup initiation, retry behavior, and 
error propagation. (#62302)
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../google/cloud/operators/kubernetes_engine.py    |  92 +++++++++++++-
 .../cloud/operators/test_kubernetes_engine.py      | 136 ++++++++++++++++++++-
 2 files changed, 226 insertions(+), 2 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index f66b142cd51..a49849a7084 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -19,12 +19,13 @@
 
 from __future__ import annotations
 
+import time
 import warnings
 from collections.abc import Sequence
 from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import AlreadyExists, FailedPrecondition, 
PermissionDenied
 from kubernetes.client import V1JobList, models as k8s
 from packaging.version import parse as parse_version
 
@@ -354,6 +355,13 @@ class GKECreateClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
     :param api_version: The api version to use
     :param deferrable: Run operator in the deferrable mode.
     :param poll_interval: Interval size which defines how often operation 
status is checked.
+    :param delete_cluster_on_failure: If True, attempt best-effort deletion of 
the
+        cluster when a PermissionDenied error occurs after creation has 
started.
+        Cleanup failures are logged and do not mask the original exception.
+        Default is True.
+    :param cleanup_timeout_seconds: Maximum number of seconds to keep retrying
+        best-effort cluster deletion when cleanup is triggered. Deletion 
retries
+        stop once this timeout is reached. Default is 600 seconds.
     """
 
     template_fields: Sequence[str] = tuple(
@@ -373,6 +381,8 @@ class GKECreateClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         api_version: str = "v2",
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         poll_interval: int = 10,
+        delete_cluster_on_failure: bool = True,
+        cleanup_timeout_seconds: int = 600,
         *args,
         **kwargs,
     ) -> None:
@@ -387,6 +397,8 @@ class GKECreateClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
         self.api_version = api_version
         self.poll_interval = poll_interval
         self.deferrable = deferrable
+        self.delete_cluster_on_failure = delete_cluster_on_failure
+        self.cleanup_timeout_seconds = cleanup_timeout_seconds
         self._validate_input()
         super().__init__(*args, **kwargs)
 
@@ -452,6 +464,72 @@ class GKECreateClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
                     stacklevel=2,
                 )
 
+    def _attempt_cleanup_with_retry(self) -> None:
+        """
+        Attempt bounded best-effort deletion of the cluster.
+
+        This method is only invoked during task failure handling.
+        It does not block until deletion completes and will not
+        mask the original exception.
+        """
+        # Fixed retry interval for semantic retry (cluster still processing
+        # a previous operation). We intentionally avoid using SDK Retry here
+        # to keep behavior explicit and bounded.
+        RETRY_INTERVAL_SECONDS = 60  #
+
+        # Bound cleanup attempts to avoid indefinitely occupying a worker slot.
+        deadline = time.monotonic() + self.cleanup_timeout_seconds
+        attempt = 1
+
+        while True:
+            try:
+                self.log.info(
+                    "Attempt %s: Deleting GKE cluster %s.",
+                    attempt,
+                    self.cluster_name,
+                )
+
+                # Do not wait for deletion to complete; cleanup is best-effort
+                # and should not delay failure propagation.
+                self.cluster_hook.delete_cluster(
+                    name=self.cluster_name,
+                    project_id=self.project_id,
+                    wait_to_complete=False,
+                )
+
+                self.log.info(
+                    "Successfully initiated deletion of GKE cluster %s.",
+                    self.cluster_name,
+                )
+                return
+
+            except FailedPrecondition:
+                # Cluster likely still has an active operation (e.g. creation
+                # still in progress). Retry until bounded deadline.
+                if time.monotonic() >= deadline:
+                    self.log.exception(
+                        "Timed out after %s seconds while trying to delete GKE 
cluster %s.",
+                        self.cleanup_timeout_seconds,
+                        self.cluster_name,
+                    )
+                    return
+
+                self.log.warning(
+                    "Cluster %s still has active operation. Retrying deletion 
in %s seconds.",
+                    self.cluster_name,
+                    RETRY_INTERVAL_SECONDS,
+                )
+                time.sleep(RETRY_INTERVAL_SECONDS)
+                attempt += 1
+                continue
+
+            except PermissionDenied:
+                self.log.exception(
+                    "Permission denied while attempting to delete GKE cluster 
%s.",
+                    self.cluster_name,
+                )
+                return
+
     @property
     def extra_links_params(self) -> dict[str, Any]:
         return {
@@ -472,6 +550,18 @@ class GKECreateClusterOperator(GKEOperatorMixin, 
GoogleCloudBaseOperator):
             self.log.info("Assuming Success: %s", error.message)
             return self.cluster_hook.get_cluster(name=self.cluster_name, 
project_id=self.project_id).self_link
 
+        except PermissionDenied:
+            # Handle cleanup for non-deferrable mode.
+            if not self.deferrable:
+                self.log.warning(
+                    "Execution failed after GKE cluster %s was started by this 
task instance.",
+                    self.cluster_name,
+                )
+
+                if self.delete_cluster_on_failure:
+                    self._attempt_cleanup_with_retry()
+            raise
+
         if self.deferrable:
             self.defer(
                 trigger=GKEOperationTrigger(
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py 
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
index 442f158a6e2..654ddd8b45b 100644
--- 
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
+++ 
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
@@ -22,7 +22,7 @@ from unittest import mock
 from unittest.mock import PropertyMock, call
 
 import pytest
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import AlreadyExists, FailedPrecondition, 
PermissionDenied
 from google.cloud.container_v1.types import Cluster, NodePool
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -522,6 +522,140 @@ class TestGKECreateClusterOperator:
         mock_log.info.assert_called_once_with("Assuming Success: %s", 
expected_error_message)
         assert result == TEST_SELF_LINK
 
+    @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineClusterLink"))
+    def test_execute_cleanup_on_permission_denied(
+        self,
+        mock_link,
+        mock_cluster_hook,
+    ):
+
+        # Simulate cluster creation success.
+        mock_create_cluster = mock_cluster_hook.return_value.create_cluster
+
+        mock_delete_cluster = mock_cluster_hook.return_value.delete_cluster
+
+        permission_error = PermissionDenied("Missing container.operations.get")
+        mock_create_cluster.side_effect = permission_error
+
+        operator = GKECreateClusterOperator(
+            task_id=TEST_TASK_ID,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            body=GKE_CLUSTER_CREATE_BODY_DICT,
+            gcp_conn_id=TEST_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+            deferrable=False,
+            delete_cluster_on_failure=True,
+        )
+
+        with pytest.raises(PermissionDenied):
+            operator.execute({})
+
+        # Cluster creation attempted.
+        mock_create_cluster.assert_called_once_with(
+            cluster=GKE_CLUSTER_CREATE_BODY_DICT,
+            project_id=TEST_PROJECT_ID,
+            wait_to_complete=True,
+        )
+
+        # Cleanup attempted.
+        mock_delete_cluster.assert_called_once_with(
+            name=GKE_CLUSTER_NAME,
+            project_id=TEST_PROJECT_ID,
+            wait_to_complete=False,
+        )
+
+    @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineClusterLink"))
+    def test_execute_cleanup_failure_does_not_mask_original_error(
+        self,
+        mock_link,
+        mock_cluster_hook,
+    ):
+
+        # Simulate cluster creation success.
+        mock_create_cluster = mock_cluster_hook.return_value.create_cluster
+
+        mock_delete_cluster = mock_cluster_hook.return_value.delete_cluster
+
+        # Simulate wait failure due to missing operations.get and 
clusters.delete.
+        permission_error = PermissionDenied("Missing container.operations.get")
+        cleanup_error = PermissionDenied("Missing container.clusters.delete")
+
+        mock_create_cluster.side_effect = permission_error
+        mock_delete_cluster.side_effect = cleanup_error
+
+        operator = GKECreateClusterOperator(
+            task_id=TEST_TASK_ID,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            body=GKE_CLUSTER_CREATE_BODY_DICT,
+            gcp_conn_id=TEST_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+            deferrable=False,
+            delete_cluster_on_failure=True,
+        )
+
+        with pytest.raises(PermissionDenied) as exc:
+            operator.execute({})
+
+        # Original exception preserved.
+        assert exc.value is permission_error
+
+        # Creation attempted.
+        mock_create_cluster.assert_called_once()
+
+        # Cleanup attempted despite failure.
+        mock_delete_cluster.assert_called_once_with(
+            name=GKE_CLUSTER_NAME,
+            project_id=TEST_PROJECT_ID,
+            wait_to_complete=False,
+        )
+
+    @mock.patch(GKE_OPERATORS_PATH.format("time.sleep"), return_value=None)
+    @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
+    @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineClusterLink"))
+    def test_execute_cleanup_retries_on_active_operation(self, mock_link, 
mock_cluster_hook, mock_sleep):
+
+        # Simulate cluster creation success.
+        mock_create_cluster = mock_cluster_hook.return_value.create_cluster
+        mock_delete_cluster = mock_cluster_hook.return_value.delete_cluster
+
+        # Simulate wait failure due to missing operations.get.
+        permission_error = PermissionDenied("Missing container.operations.get")
+        mock_create_cluster.side_effect = permission_error
+
+        # First delete attempt on active operation leads to FailedPrecondition,
+        active_op_error = FailedPrecondition(
+            message="Cluster is running incompatible operation.",
+            errors={
+                "reason": "CLUSTER_ALREADY_HAS_OPERATION",
+            },
+        )
+
+        # Second delete attempt is successful.
+        mock_delete_cluster.side_effect = [
+            active_op_error,
+            None,
+        ]
+
+        operator = GKECreateClusterOperator(
+            task_id=TEST_TASK_ID,
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
+            body=GKE_CLUSTER_CREATE_BODY_DICT,
+            delete_cluster_on_failure=True,
+            cleanup_timeout_seconds=120,
+        )
+
+        # Simulate PermissionDenied during execution.
+        with pytest.raises(PermissionDenied):
+            operator.execute({})
+
+        # Should retry once.
+        assert mock_delete_cluster.call_count == 2
+
     @mock.patch(GKE_OPERATORS_PATH.format("GKEOperationTrigger"))
     @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineClusterLink"))
     @mock.patch(GKE_OPERATORS_PATH.format("GKECreateClusterOperator.defer"))

Reply via email to