[ https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650995#comment-15650995 ]
ASF GitHub Bot commented on FLINK-4272: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r87052872 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/JobClientEager.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.program; + +import org.apache.flink.api.common.JobClient; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClientActorUtils; +import org.apache.flink.runtime.client.JobClientActor; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobListeningContext; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; +import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; +import org.apache.flink.util.SerializedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * A client to interact with a running Flink job. + */ +public class JobClientEager implements JobClient { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + /** The Job's listening context for monitoring and job interaction */ + private final JobListeningContext jobListeningContext; + + /** Finalization code to run upon shutting down the JobClient */ + private final List<Runnable> finalizers; + + public JobClientEager(JobListeningContext jobListeningContext) { + this.jobListeningContext = jobListeningContext; + this.finalizers = new LinkedList<>(); + } + + /** + * Blocks until the job finishes and returns the {@link JobExecutionResult} + * @return the result of the job execution + */ + @Override + public JobExecutionResult waitForResult() throws JobExecutionException { + LOG.info("Waiting for results of Job {}", jobListeningContext.getJobID()); + JobExecutionResult result = JobClientActorUtils.awaitJobResult(jobListeningContext); + shutdown(); + return result; + } + + /** + * Gets the job id that this client is bound to + * @return The JobID of this JobClient + */ + public JobID getJobID() { + return jobListeningContext.getJobID(); + } + + @Override + public boolean hasFinished() { + return jobListeningContext.getJobResultFuture().isCompleted(); + } + + /** + * Cancels a job identified by the job id. + * @throws Exception In case an error occurred. + */ + @Override + public void cancel() throws Exception { + final ActorGateway jobClient = jobListeningContext.getJobClientGateway(); + + final Future<Object> response; + try { + response = jobClient.ask( + new JobClientActor.ClientMessage( + new JobManagerMessages.CancelJob(getJobID())), + AkkaUtils.getDefaultTimeoutAsFiniteDuration()); + } catch (final Exception e) { + throw new ProgramInvocationException("Failed to query the job manager gateway.", e); --- End diff -- Why is this a `ProgramInvocationException`? Should be rather something like a `JobClientOperationException`. > Create a JobClient for job control and monitoring > -------------------------------------------------- > > Key: FLINK-4272 > URL: https://issues.apache.org/jira/browse/FLINK-4272 > Project: Flink > Issue Type: New Feature > Components: Client > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Priority: Minor > Fix For: 1.2.0 > > > The aim of this new features is to expose a client to the user which allows > to cancel a running job, retrieve accumulators for a running job, or perform > other actions in the future. Let's call it {{JobClient}} for now (although > this clashes with the existing JobClient class which could be renamed to > JobClientActorUtils instead). > The new client should be returned from the {{ClusterClient}} class upon job > submission. The client should also be instantiatable by the users to retrieve > the JobClient with a JobID. > We should expose the new JobClient to the Java and Scala APIs using a new > method on the {{ExecutionEnvironment}} / {{StreamExecutionEnvironment}} > called {{executeWithControl()}} (perhaps we can find a better name). -- This message was sent by Atlassian JIRA (v6.3.4#6332)