[ https://issues.apache.org/jira/browse/FLINK-4272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15650983#comment-15650983 ]
ASF GitHub Bot commented on FLINK-4272: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2732#discussion_r87097717 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/JobClientTest.java --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.program; + +import akka.dispatch.Futures; +import org.apache.flink.api.common.JobClient; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.JobListeningContext; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.util.SerializedValue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Promise; + +import java.util.Collections; + + +/** + * Tests the JobClient implementations. + * + * See also: JobRetrievalITCase + */ +public class JobClientTest { + + private static boolean finalizeCalled; + + private JobListeningContext listeningContext; + private JobID jobID; + private JobManagerMessages.JobResultSuccess successMessage; + + private Runnable finalizer = new Runnable() { + @Override + public void run() { + finalizeCalled = true; + } + }; + + private Promise<Object> resultPromise; + + @Before + public void beforeTest() throws Exception { + finalizeCalled = false; + + this.jobID = JobID.generate(); + this.listeningContext = Mockito.mock(JobListeningContext.class); + this.resultPromise = Futures.promise(); + ActorGateway mockActorClientGateway = Mockito.mock(ActorGateway.class); + Mockito.when(listeningContext.getJobID()).thenReturn(jobID); + Mockito.when(listeningContext.getJobClientGateway()).thenReturn(mockActorClientGateway); + Mockito.when(listeningContext.getJobResultFuture()).thenReturn(resultPromise.future()); + Mockito.when(listeningContext.getClassLoader()).thenReturn(JobClientTest.class.getClassLoader()); + + this.successMessage = new JobManagerMessages.JobResultSuccess( + new SerializedJobExecutionResult( + jobID, + 42, + Collections.singletonMap("key", new SerializedValue<Object>("value")))); + } + + @Test(timeout = 10000) --- End diff -- For what do we need the timeout? > Create a JobClient for job control and monitoring > -------------------------------------------------- > > Key: FLINK-4272 > URL: https://issues.apache.org/jira/browse/FLINK-4272 > Project: Flink > Issue Type: New Feature > Components: Client > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Priority: Minor > Fix For: 1.2.0 > > > The aim of this new features is to expose a client to the user which allows > to cancel a running job, retrieve accumulators for a running job, or perform > other actions in the future. Let's call it {{JobClient}} for now (although > this clashes with the existing JobClient class which could be renamed to > JobClientActorUtils instead). > The new client should be returned from the {{ClusterClient}} class upon job > submission. The client should also be instantiatable by the users to retrieve > the JobClient with a JobID. > We should expose the new JobClient to the Java and Scala APIs using a new > method on the {{ExecutionEnvironment}} / {{StreamExecutionEnvironment}} > called {{executeWithControl()}} (perhaps we can find a better name). -- This message was sent by Atlassian JIRA (v6.3.4#6332)