wangyang0918 commented on a change in pull request #17554: URL: https://github.com/apache/flink/pull/17554#discussion_r752813610
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ########## @@ -205,17 +211,23 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); } - final ClusterClientProvider<String> clusterClientProvider = - deployClusterInternal( - KubernetesApplicationClusterEntrypoint.class.getName(), - clusterSpecification, - false); + ClusterClientProvider<String> clusterClientProvider; + try { + clusterClientProvider = + deployClusterInternal( + KubernetesApplicationClusterEntrypoint.class.getName(), + clusterSpecification, + false); - try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) { - LOG.info( - "Create flink application cluster {} successfully, JobManager Web Interface: {}", - clusterId, - clusterClient.getWebInterfaceURL()); + try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink application cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + } catch (Exception e) { Review comment: I am curious whether we could wrap the `try...catch {// clean up resources}` in a separate method. Just like following. WDYT? ``` private <T> ClusterClientProvider<T> safelyDeployCluster( SupplierWithException<ClusterClientProvider<T>, Exception> supplier) throws ClusterDeploymentException { try { return supplier.get(); } catch (Exception e) { try { LOG.warn( "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", clusterId); client.stopAndCleanupCluster(clusterId); } catch (Exception ex) { LOG.warn( "Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, e); } throw new ClusterDeploymentException(e); } } ``` ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClusterDescriptorTest.java ########## @@ -131,16 +131,14 @@ public void testKillCluster() throws Exception { } @Test - public void testDeployApplicationCluster() { + public void testDeployApplicationCluster() throws ClusterDeploymentException { flinkConfig.set( PipelineOptions.JARS, Collections.singletonList("local:///path/of/user.jar")); flinkConfig.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); - try { - descriptor.deployApplicationCluster(clusterSpecification, appConfig); - } catch (Exception ignored) { - } - mockExpectedServiceFromServerSide(loadBalancerSvc); + mockFirstEmptyFollowByExpectedServiceFromServerSide(new Service(), loadBalancerSvc); Review comment: I like this change. Great. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java ########## @@ -256,39 +268,35 @@ private String getWebMonitorAddress(Configuration configuration) throws Exceptio flinkConfig.get(JobManagerOptions.PORT)); } + final KubernetesJobManagerParameters kubernetesJobManagerParameters = + new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = + kubernetesJobManagerParameters + .getPodTemplateFilePath() + .map( + file -> + KubernetesUtils.loadPodFromTemplateFile( + client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, kubernetesJobManagerParameters); + + client.createJobManagerComponent(kubernetesJobManagerSpec); + + return createClusterClientProvider(clusterId); + } + + private void killClusterSilently(Throwable throwable) { try { - final KubernetesJobManagerParameters kubernetesJobManagerParameters = - new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); - - final FlinkPod podTemplate = - kubernetesJobManagerParameters - .getPodTemplateFilePath() - .map( - file -> - KubernetesUtils.loadPodFromTemplateFile( - client, file, Constants.MAIN_CONTAINER_NAME)) - .orElse(new FlinkPod.Builder().build()); - final KubernetesJobManagerSpecification kubernetesJobManagerSpec = - KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( - podTemplate, kubernetesJobManagerParameters); - - client.createJobManagerComponent(kubernetesJobManagerSpec); - - return createClusterClientProvider(clusterId); + LOG.warn( + "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", + clusterId, + throwable); Review comment: Do we need to log the throwable here? We might have duplicated exception stack since we also throw a new `ClusterDeploymentException`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org