shahar1 commented on code in PR #61951:
URL: https://github.com/apache/airflow/pull/61951#discussion_r2812519912
##########
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -820,37 +853,43 @@ def execute(self, context: Context) -> dict:
)
try:
- # First try to create a new cluster
operation = self._create_cluster(hook)
- if not self.deferrable and type(operation) is not str:
- cluster = hook.wait_for_operation(
- timeout=self.timeout, result_retry=self.retry,
operation=operation
+
+ if not self.deferrable and not isinstance(operation, str):
+ hook.wait_for_operation(
+ timeout=self.timeout,
+ result_retry=self.retry,
+ operation=operation,
)
- self.log.info("Cluster created.")
- return Cluster.to_dict(cluster)
- cluster = hook.get_cluster(
- project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
- )
- if cluster.status.state == cluster.status.State.RUNNING:
- self.log.info("Cluster created.")
- return Cluster.to_dict(cluster)
- self.defer(
- trigger=DataprocClusterTrigger(
- cluster_name=self.cluster_name,
- project_id=self.project_id,
- region=self.region,
- gcp_conn_id=self.gcp_conn_id,
- impersonation_chain=self.impersonation_chain,
- polling_interval_seconds=self.polling_interval_seconds,
- delete_on_error=self.delete_on_error,
- ),
- method_name="execute_complete",
- )
+
+ # Fetch current state.
+ try:
+ cluster = self._get_cluster(hook)
+ except NotFound:
+ raise AirflowException(f"Cluster {self.cluster_name} could not
be found.")
Review Comment:
Nested `try..except` are usually discouraged as it could lead to unintended
behaviors. In this case, it will be caught by `outer_airflow_exception` which
will rerun `cluster = self._get_cluster(hook)`.
You might need to re-work the logic here.
##########
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -870,39 +909,28 @@ def execute(self, context: Context) -> dict:
self._delete_cluster(hook)
self._wait_for_cluster_in_deleting_state(hook)
raise resource_not_ready_error
- except AirflowException as ae:
- # There still could be a cluster created here in an ERROR state
which
- # should be deleted immediately rather than consuming another
retry attempt
- # (assuming delete_on_error is true (default))
- # This reduces overall the number of task attempts from 3 to 2 to
successful cluster creation
- # assuming the underlying GCE issues have resolved within that
window. Users can configure
- # a higher number of retry attempts in powers of two with 30s-60s
wait interval
+
+ except AirflowException as outer_airflow_exception:
+ # A cluster may have been created but entered ERROR state.
+ # If delete_on_error is enabled, delete it immediately so that
+ # the next retry attempt starts from a clean state.
try:
cluster = self._get_cluster(hook)
self._handle_error_state(hook, cluster)
- except AirflowException as ae_inner:
- # We could get any number of failures here, including cluster
not found and we
- # can just ignore to ensure we surface the original cluster
create failure
- self.log.exception(ae_inner)
+ except AirflowException as inner_airflow_exception:
+ # Cleanup logic may raise secondary exceptions (e.g., cluster
not found).
+ # Suppress those so that the original cluster creation failure
is surfaced.
+ self.log.exception(inner_airflow_exception)
finally:
- raise ae
+ raise outer_airflow_exception
- # Check if cluster is not in ERROR state
+ # Check if cluster is not in ERROR state.
self._handle_error_state(hook, cluster)
- if cluster.status.state == cluster.status.State.CREATING:
- # Wait for cluster to be created
- cluster = self._wait_for_cluster_in_creating_state(hook)
- self._handle_error_state(hook, cluster)
- elif cluster.status.state == cluster.status.State.DELETING:
- # Wait for cluster to be deleted
- self._wait_for_cluster_in_deleting_state(hook)
- # Create new cluster
- cluster = self._create_cluster(hook)
- self._handle_error_state(hook, cluster)
- elif cluster.status.state == cluster.status.State.STOPPED:
- # if the cluster exists and already stopped, then start the cluster
- self._start_cluster(hook)
+ # If cluster is not in RUNNING state, reconcile.
+ cluster = self._reconcile_cluster_state(hook, cluster)
+
+ self.log.info("Cluster %s is RUNNING.", self.cluster_name)
return Cluster.to_dict(cluster)
Review Comment:
1. Can there be a situation where a cluster is actually stopped after
reconciliation attempt? In that case, log info might be confusing.
2. Does `cluster` dictionary get updated after being reconciliated from a
stopped state? That might be even more :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]