[ https://issues.apache.org/jira/browse/FLINK-8609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364162#comment-16364162 ]
ASF GitHub Bot commented on FLINK-8609: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5432#discussion_r168195242 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -210,51 +225,72 @@ protected void run(String[] args) throws Exception { final ClusterClient<T> client; - if (clusterId != null) { - client = clusterDescriptor.retrieve(clusterId); - } else { - final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); - client = clusterDescriptor.deploySessionCluster(clusterSpecification); - } + // directly deploy the job if the cluster is started in job mode and detached + if (flip6 && clusterId == null && runOptions.getDetachedMode()) { + int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism(); - try { - client.setPrintStatusDuringExecution(runOptions.getStdoutLogging()); - client.setDetached(runOptions.getDetachedMode()); - LOG.debug("Client slots is set to {}", client.getMaxSlots()); - - LOG.debug(runOptions.getSavepointRestoreSettings().toString()); - - int userParallelism = runOptions.getParallelism(); - LOG.debug("User parallelism is set to {}", userParallelism); - if (client.getMaxSlots() != -1 && userParallelism == -1) { - logAndSysout("Using the parallelism provided by the remote cluster (" - + client.getMaxSlots() + "). " - + "To use another parallelism, set it at the ./bin/flink client."); - userParallelism = client.getMaxSlots(); - } else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) { - userParallelism = defaultParallelism; - } + final JobGraph jobGraph = createJobGraph(configuration, program, parallelism); - executeProgram(program, client, userParallelism); - } finally { - if (clusterId == null && !client.isDetached()) { - // terminate the cluster only if we have started it before and if it's not detached - try { - clusterDescriptor.terminateCluster(client.getClusterId()); - } catch (FlinkException e) { - LOG.info("Could not properly terminate the Flink cluster.", e); - } - } + final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); + client = clusterDescriptor.deployJobCluster( + clusterSpecification, + jobGraph, + runOptions.getDetachedMode()); + + logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID()); try { client.shutdown(); } catch (Exception e) { LOG.info("Could not properly shut down the client.", e); } + } else { + if (clusterId != null) { + client = clusterDescriptor.retrieve(clusterId); + } else { + // also in job mode we have to deploy a session cluster because the job + // might consist of multiple parts (e.g. when using collect) + final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); + client = clusterDescriptor.deploySessionCluster(clusterSpecification); + } + + try { + client.setPrintStatusDuringExecution(runOptions.getStdoutLogging()); + client.setDetached(runOptions.getDetachedMode()); + LOG.debug("Client slots is set to {}", client.getMaxSlots()); + + LOG.debug(runOptions.getSavepointRestoreSettings().toString()); --- End diff -- True, will improve it. > Add support to deploy detached job mode clusters > ------------------------------------------------ > > Key: FLINK-8609 > URL: https://issues.apache.org/jira/browse/FLINK-8609 > Project: Flink > Issue Type: New Feature > Components: Client > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > After adding FLINK-8608, we can add support to the {{CliFrontend}} to deploy > detached job mode clusters. -- This message was sent by Atlassian JIRA (v7.6.3#76005)