SameerMesiah97 commented on code in PR #61951:
URL: https://github.com/apache/airflow/pull/61951#discussion_r2813106061


##########
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -820,37 +853,43 @@ def execute(self, context: Context) -> dict:
             )
 
         try:
-            # First try to create a new cluster
             operation = self._create_cluster(hook)
-            if not self.deferrable and type(operation) is not str:
-                cluster = hook.wait_for_operation(
-                    timeout=self.timeout, result_retry=self.retry, 
operation=operation
+
+            if not self.deferrable and not isinstance(operation, str):
+                hook.wait_for_operation(
+                    timeout=self.timeout,
+                    result_retry=self.retry,
+                    operation=operation,
                 )
-                self.log.info("Cluster created.")
-                return Cluster.to_dict(cluster)
-            cluster = hook.get_cluster(
-                project_id=self.project_id, region=self.region, 
cluster_name=self.cluster_name
-            )
-            if cluster.status.state == cluster.status.State.RUNNING:
-                self.log.info("Cluster created.")
-                return Cluster.to_dict(cluster)
-            self.defer(
-                trigger=DataprocClusterTrigger(
-                    cluster_name=self.cluster_name,
-                    project_id=self.project_id,
-                    region=self.region,
-                    gcp_conn_id=self.gcp_conn_id,
-                    impersonation_chain=self.impersonation_chain,
-                    polling_interval_seconds=self.polling_interval_seconds,
-                    delete_on_error=self.delete_on_error,
-                ),
-                method_name="execute_complete",
-            )
+
+            # Fetch current state.
+            try:
+                cluster = self._get_cluster(hook)
+            except NotFound:
+                raise AirflowException(f"Cluster {self.cluster_name} could not 
be found.")

Review Comment:
   Agreed. Looks like it will get just get caught by the exception handling 
block on lines 913-925. Perhaps it is best to just let `NotFound` propagate. It 
is informative enough. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to