SameerMesiah97 commented on code in PR #61951:
URL: https://github.com/apache/airflow/pull/61951#discussion_r2813106061
##########
providers/google/src/airflow/providers/google/cloud/operators/dataproc.py:
##########
@@ -820,37 +853,43 @@ def execute(self, context: Context) -> dict:
)
try:
- # First try to create a new cluster
operation = self._create_cluster(hook)
- if not self.deferrable and type(operation) is not str:
- cluster = hook.wait_for_operation(
- timeout=self.timeout, result_retry=self.retry,
operation=operation
+
+ if not self.deferrable and not isinstance(operation, str):
+ hook.wait_for_operation(
+ timeout=self.timeout,
+ result_retry=self.retry,
+ operation=operation,
)
- self.log.info("Cluster created.")
- return Cluster.to_dict(cluster)
- cluster = hook.get_cluster(
- project_id=self.project_id, region=self.region,
cluster_name=self.cluster_name
- )
- if cluster.status.state == cluster.status.State.RUNNING:
- self.log.info("Cluster created.")
- return Cluster.to_dict(cluster)
- self.defer(
- trigger=DataprocClusterTrigger(
- cluster_name=self.cluster_name,
- project_id=self.project_id,
- region=self.region,
- gcp_conn_id=self.gcp_conn_id,
- impersonation_chain=self.impersonation_chain,
- polling_interval_seconds=self.polling_interval_seconds,
- delete_on_error=self.delete_on_error,
- ),
- method_name="execute_complete",
- )
+
+ # Fetch current state.
+ try:
+ cluster = self._get_cluster(hook)
+ except NotFound:
+ raise AirflowException(f"Cluster {self.cluster_name} could not
be found.")
Review Comment:
Agreed. Looks like it will get just get caught by the exception handling
block on lines 913-925. Perhaps it is best to just let `NotFound` propagate. It
is informative enough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]