This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8b925b31106 Add bounded best-effort cluster deletion when
PermissionDenied occurs after cluster creation has been initiated in
non-deferrable mode. Deletion is triggered with wait_to_complete=False and
retried on FailedPrecondition until cleanup_timeout_seconds is reached, and the
original exception is always re-raised. Add unit tests covering cleanup
initiation, retry behavior, and error propagation. (#62302)
8b925b31106 is described below
commit 8b925b31106a94792dd9fd7a587070141ac9e8a5
Author: SameerMesiah97 <[email protected]>
AuthorDate: Tue Mar 3 20:02:45 2026 +0000
Add bounded best-effort cluster deletion when PermissionDenied occurs after
cluster creation has been initiated in non-deferrable mode. Deletion is
triggered with wait_to_complete=False and retried on FailedPrecondition until
cleanup_timeout_seconds is reached, and the original exception is always
re-raised. Add unit tests covering cleanup initiation, retry behavior, and
error propagation. (#62302)
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../google/cloud/operators/kubernetes_engine.py | 92 +++++++++++++-
.../cloud/operators/test_kubernetes_engine.py | 136 ++++++++++++++++++++-
2 files changed, 226 insertions(+), 2 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
index f66b142cd51..a49849a7084 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/kubernetes_engine.py
@@ -19,12 +19,13 @@
from __future__ import annotations
+import time
import warnings
from collections.abc import Sequence
from functools import cached_property
from typing import TYPE_CHECKING, Any
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import AlreadyExists, FailedPrecondition,
PermissionDenied
from kubernetes.client import V1JobList, models as k8s
from packaging.version import parse as parse_version
@@ -354,6 +355,13 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
:param api_version: The api version to use
:param deferrable: Run operator in the deferrable mode.
:param poll_interval: Interval size which defines how often operation
status is checked.
+ :param delete_cluster_on_failure: If True, attempt best-effort deletion of
the
+ cluster when a PermissionDenied error occurs after creation has
started.
+ Cleanup failures are logged and do not mask the original exception.
+ Default is True.
+ :param cleanup_timeout_seconds: Maximum number of seconds to keep retrying
+ best-effort cluster deletion when cleanup is triggered. Deletion
retries
+ stop once this timeout is reached. Default is 600 seconds.
"""
template_fields: Sequence[str] = tuple(
@@ -373,6 +381,8 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
api_version: str = "v2",
deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
poll_interval: int = 10,
+ delete_cluster_on_failure: bool = True,
+ cleanup_timeout_seconds: int = 600,
*args,
**kwargs,
) -> None:
@@ -387,6 +397,8 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self.api_version = api_version
self.poll_interval = poll_interval
self.deferrable = deferrable
+ self.delete_cluster_on_failure = delete_cluster_on_failure
+ self.cleanup_timeout_seconds = cleanup_timeout_seconds
self._validate_input()
super().__init__(*args, **kwargs)
@@ -452,6 +464,72 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
stacklevel=2,
)
+ def _attempt_cleanup_with_retry(self) -> None:
+ """
+ Attempt bounded best-effort deletion of the cluster.
+
+ This method is only invoked during task failure handling.
+ It does not block until deletion completes and will not
+ mask the original exception.
+ """
+ # Fixed retry interval for semantic retry (cluster still processing
+ # a previous operation). We intentionally avoid using SDK Retry here
+ # to keep behavior explicit and bounded.
+ RETRY_INTERVAL_SECONDS = 60 #
+
+ # Bound cleanup attempts to avoid indefinitely occupying a worker slot.
+ deadline = time.monotonic() + self.cleanup_timeout_seconds
+ attempt = 1
+
+ while True:
+ try:
+ self.log.info(
+ "Attempt %s: Deleting GKE cluster %s.",
+ attempt,
+ self.cluster_name,
+ )
+
+ # Do not wait for deletion to complete; cleanup is best-effort
+ # and should not delay failure propagation.
+ self.cluster_hook.delete_cluster(
+ name=self.cluster_name,
+ project_id=self.project_id,
+ wait_to_complete=False,
+ )
+
+ self.log.info(
+ "Successfully initiated deletion of GKE cluster %s.",
+ self.cluster_name,
+ )
+ return
+
+ except FailedPrecondition:
+ # Cluster likely still has an active operation (e.g. creation
+ # still in progress). Retry until bounded deadline.
+ if time.monotonic() >= deadline:
+ self.log.exception(
+ "Timed out after %s seconds while trying to delete GKE
cluster %s.",
+ self.cleanup_timeout_seconds,
+ self.cluster_name,
+ )
+ return
+
+ self.log.warning(
+ "Cluster %s still has active operation. Retrying deletion
in %s seconds.",
+ self.cluster_name,
+ RETRY_INTERVAL_SECONDS,
+ )
+ time.sleep(RETRY_INTERVAL_SECONDS)
+ attempt += 1
+ continue
+
+ except PermissionDenied:
+ self.log.exception(
+ "Permission denied while attempting to delete GKE cluster
%s.",
+ self.cluster_name,
+ )
+ return
+
@property
def extra_links_params(self) -> dict[str, Any]:
return {
@@ -472,6 +550,18 @@ class GKECreateClusterOperator(GKEOperatorMixin,
GoogleCloudBaseOperator):
self.log.info("Assuming Success: %s", error.message)
return self.cluster_hook.get_cluster(name=self.cluster_name,
project_id=self.project_id).self_link
+ except PermissionDenied:
+ # Handle cleanup for non-deferrable mode.
+ if not self.deferrable:
+ self.log.warning(
+ "Execution failed after GKE cluster %s was started by this
task instance.",
+ self.cluster_name,
+ )
+
+ if self.delete_cluster_on_failure:
+ self._attempt_cleanup_with_retry()
+ raise
+
if self.deferrable:
self.defer(
trigger=GKEOperationTrigger(
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
index 442f158a6e2..654ddd8b45b 100644
---
a/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
+++
b/providers/google/tests/unit/google/cloud/operators/test_kubernetes_engine.py
@@ -22,7 +22,7 @@ from unittest import mock
from unittest.mock import PropertyMock, call
import pytest
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import AlreadyExists, FailedPrecondition,
PermissionDenied
from google.cloud.container_v1.types import Cluster, NodePool
from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -522,6 +522,140 @@ class TestGKECreateClusterOperator:
mock_log.info.assert_called_once_with("Assuming Success: %s",
expected_error_message)
assert result == TEST_SELF_LINK
+ @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineClusterLink"))
+ def test_execute_cleanup_on_permission_denied(
+ self,
+ mock_link,
+ mock_cluster_hook,
+ ):
+
+ # Simulate cluster creation success.
+ mock_create_cluster = mock_cluster_hook.return_value.create_cluster
+
+ mock_delete_cluster = mock_cluster_hook.return_value.delete_cluster
+
+ permission_error = PermissionDenied("Missing container.operations.get")
+ mock_create_cluster.side_effect = permission_error
+
+ operator = GKECreateClusterOperator(
+ task_id=TEST_TASK_ID,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ body=GKE_CLUSTER_CREATE_BODY_DICT,
+ gcp_conn_id=TEST_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ deferrable=False,
+ delete_cluster_on_failure=True,
+ )
+
+ with pytest.raises(PermissionDenied):
+ operator.execute({})
+
+ # Cluster creation attempted.
+ mock_create_cluster.assert_called_once_with(
+ cluster=GKE_CLUSTER_CREATE_BODY_DICT,
+ project_id=TEST_PROJECT_ID,
+ wait_to_complete=True,
+ )
+
+ # Cleanup attempted.
+ mock_delete_cluster.assert_called_once_with(
+ name=GKE_CLUSTER_NAME,
+ project_id=TEST_PROJECT_ID,
+ wait_to_complete=False,
+ )
+
+ @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineClusterLink"))
+ def test_execute_cleanup_failure_does_not_mask_original_error(
+ self,
+ mock_link,
+ mock_cluster_hook,
+ ):
+
+ # Simulate cluster creation success.
+ mock_create_cluster = mock_cluster_hook.return_value.create_cluster
+
+ mock_delete_cluster = mock_cluster_hook.return_value.delete_cluster
+
+ # Simulate wait failure due to missing operations.get and
clusters.delete.
+ permission_error = PermissionDenied("Missing container.operations.get")
+ cleanup_error = PermissionDenied("Missing container.clusters.delete")
+
+ mock_create_cluster.side_effect = permission_error
+ mock_delete_cluster.side_effect = cleanup_error
+
+ operator = GKECreateClusterOperator(
+ task_id=TEST_TASK_ID,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ body=GKE_CLUSTER_CREATE_BODY_DICT,
+ gcp_conn_id=TEST_CONN_ID,
+ impersonation_chain=TEST_IMPERSONATION_CHAIN,
+ deferrable=False,
+ delete_cluster_on_failure=True,
+ )
+
+ with pytest.raises(PermissionDenied) as exc:
+ operator.execute({})
+
+ # Original exception preserved.
+ assert exc.value is permission_error
+
+ # Creation attempted.
+ mock_create_cluster.assert_called_once()
+
+ # Cleanup attempted despite failure.
+ mock_delete_cluster.assert_called_once_with(
+ name=GKE_CLUSTER_NAME,
+ project_id=TEST_PROJECT_ID,
+ wait_to_complete=False,
+ )
+
+ @mock.patch(GKE_OPERATORS_PATH.format("time.sleep"), return_value=None)
+ @mock.patch(GKE_OPERATORS_PATH.format("GKEHook"))
+ @mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineClusterLink"))
+ def test_execute_cleanup_retries_on_active_operation(self, mock_link,
mock_cluster_hook, mock_sleep):
+
+ # Simulate cluster creation success.
+ mock_create_cluster = mock_cluster_hook.return_value.create_cluster
+ mock_delete_cluster = mock_cluster_hook.return_value.delete_cluster
+
+ # Simulate wait failure due to missing operations.get.
+ permission_error = PermissionDenied("Missing container.operations.get")
+ mock_create_cluster.side_effect = permission_error
+
+ # First delete attempt on active operation leads to FailedPrecondition,
+ active_op_error = FailedPrecondition(
+ message="Cluster is running incompatible operation.",
+ errors={
+ "reason": "CLUSTER_ALREADY_HAS_OPERATION",
+ },
+ )
+
+ # Second delete attempt is successful.
+ mock_delete_cluster.side_effect = [
+ active_op_error,
+ None,
+ ]
+
+ operator = GKECreateClusterOperator(
+ task_id=TEST_TASK_ID,
+ project_id=TEST_PROJECT_ID,
+ location=TEST_LOCATION,
+ body=GKE_CLUSTER_CREATE_BODY_DICT,
+ delete_cluster_on_failure=True,
+ cleanup_timeout_seconds=120,
+ )
+
+ # Simulate PermissionDenied during execution.
+ with pytest.raises(PermissionDenied):
+ operator.execute({})
+
+ # Should retry once.
+ assert mock_delete_cluster.call_count == 2
+
@mock.patch(GKE_OPERATORS_PATH.format("GKEOperationTrigger"))
@mock.patch(GKE_OPERATORS_PATH.format("KubernetesEngineClusterLink"))
@mock.patch(GKE_OPERATORS_PATH.format("GKECreateClusterOperator.defer"))