wangyang0918 commented on a change in pull request #17554:
URL: https://github.com/apache/flink/pull/17554#discussion_r752813610



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##########
@@ -205,17 +211,23 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
             Preconditions.checkArgument(pipelineJars.size() == 1, "Should only 
have one jar");
         }
 
-        final ClusterClientProvider<String> clusterClientProvider =
-                deployClusterInternal(
-                        KubernetesApplicationClusterEntrypoint.class.getName(),
-                        clusterSpecification,
-                        false);
+        ClusterClientProvider<String> clusterClientProvider;
+        try {
+            clusterClientProvider =
+                    deployClusterInternal(
+                            
KubernetesApplicationClusterEntrypoint.class.getName(),
+                            clusterSpecification,
+                            false);
 
-        try (ClusterClient<String> clusterClient = 
clusterClientProvider.getClusterClient()) {
-            LOG.info(
-                    "Create flink application cluster {} successfully, 
JobManager Web Interface: {}",
-                    clusterId,
-                    clusterClient.getWebInterfaceURL());
+            try (ClusterClient<String> clusterClient = 
clusterClientProvider.getClusterClient()) {
+                LOG.info(
+                        "Create flink application cluster {} successfully, 
JobManager Web Interface: {}",
+                        clusterId,
+                        clusterClient.getWebInterfaceURL());
+            }
+        } catch (Exception e) {

Review comment:
       I am curious whether we could wrap the `try...catch {// clean up 
resources}` in a separate method. Just like following. WDYT?
   
   ```
       private <T> ClusterClientProvider<T> safelyDeployCluster(
               SupplierWithException<ClusterClientProvider<T>, Exception> 
supplier)
               throws ClusterDeploymentException {
           try {
               return supplier.get();
           } catch (Exception e) {
               try {
                   LOG.warn(
                           "Failed to create the Kubernetes cluster \"{}\", try 
to clean up the residual resources.",
                           clusterId);
                   client.stopAndCleanupCluster(clusterId);
               } catch (Exception ex) {
                   LOG.warn(
                           "Failed to stop and clean up the Kubernetes cluster 
\"{}\".", clusterId, e);
               }
               throw new ClusterDeploymentException(e);
           }
       }
   ```

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java
##########
@@ -131,16 +131,14 @@ public void testKillCluster() throws Exception {
     }
 
     @Test
-    public void testDeployApplicationCluster() {
+    public void testDeployApplicationCluster() throws 
ClusterDeploymentException {
         flinkConfig.set(
                 PipelineOptions.JARS, 
Collections.singletonList("local:///path/of/user.jar"));
         flinkConfig.set(DeploymentOptions.TARGET, 
KubernetesDeploymentTarget.APPLICATION.getName());
-        try {
-            descriptor.deployApplicationCluster(clusterSpecification, 
appConfig);
-        } catch (Exception ignored) {
-        }
 
-        mockExpectedServiceFromServerSide(loadBalancerSvc);
+        mockFirstEmptyFollowByExpectedServiceFromServerSide(new Service(), 
loadBalancerSvc);

Review comment:
       I like this change. Great.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
##########
@@ -256,39 +268,35 @@ private String getWebMonitorAddress(Configuration 
configuration) throws Exceptio
                     flinkConfig.get(JobManagerOptions.PORT));
         }
 
+        final KubernetesJobManagerParameters kubernetesJobManagerParameters =
+                new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
+
+        final FlinkPod podTemplate =
+                kubernetesJobManagerParameters
+                        .getPodTemplateFilePath()
+                        .map(
+                                file ->
+                                        
KubernetesUtils.loadPodFromTemplateFile(
+                                                client, file, 
Constants.MAIN_CONTAINER_NAME))
+                        .orElse(new FlinkPod.Builder().build());
+        final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
+                
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+                        podTemplate, kubernetesJobManagerParameters);
+
+        client.createJobManagerComponent(kubernetesJobManagerSpec);
+
+        return createClusterClientProvider(clusterId);
+    }
+
+    private void killClusterSilently(Throwable throwable) {
         try {
-            final KubernetesJobManagerParameters 
kubernetesJobManagerParameters =
-                    new KubernetesJobManagerParameters(flinkConfig, 
clusterSpecification);
-
-            final FlinkPod podTemplate =
-                    kubernetesJobManagerParameters
-                            .getPodTemplateFilePath()
-                            .map(
-                                    file ->
-                                            
KubernetesUtils.loadPodFromTemplateFile(
-                                                    client, file, 
Constants.MAIN_CONTAINER_NAME))
-                            .orElse(new FlinkPod.Builder().build());
-            final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
-                    
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-                            podTemplate, kubernetesJobManagerParameters);
-
-            client.createJobManagerComponent(kubernetesJobManagerSpec);
-
-            return createClusterClientProvider(clusterId);
+            LOG.warn(
+                    "Failed to create the Kubernetes cluster \"{}\", try to 
clean up the residual resources.",
+                    clusterId,
+                    throwable);

Review comment:
       Do we need to log the throwable here? We might have duplicated exception 
stack since we also throw a new `ClusterDeploymentException`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to