[ https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406175#comment-16406175 ]
ASF GitHub Bot commented on FLINK-8703: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175731427 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java --- @@ -194,49 +187,35 @@ protected final void restoreAndExecute( String savepointPath, Tuple2<String, Integer>... expectedAccumulators) throws Exception { - // Retrieve the job manager - Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient<?> client = miniClusterResource.getClusterClient(); + client.setDetached(true); // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); - JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); - - StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); - JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID()); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader()); boolean done = false; while (DEADLINE.hasTimeLeft()) { // try and get a job result, this will fail if the job already failed. Use this // to get out of this loop JobID jobId = jobSubmissionResult.getJobID(); - FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS); try { + CompletableFuture<JobStatus> jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID()); - Future<Object> future = clusterClient - .getJobManagerGateway() - .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout); - - Object result = Await.result(future, timeout); + JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS); - if (result instanceof JobManagerMessages.CurrentJobStatus) { - if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) { - Object jobResult = Await.result( - jobListeningContext.getJobResultFuture(), - Duration.apply(5, TimeUnit.SECONDS)); - fail("Job failed: " + jobResult); - } - } + assertNotEquals(JobStatus.FAILED, jobStatus); } catch (Exception e) { fail("Could not connect to job: " + e); } Thread.sleep(100); --- End diff -- For what do we need this sleep? > Migrate tests from LocalFlinkMiniCluster to MiniClusterResource > --------------------------------------------------------------- > > Key: FLINK-8703 > URL: https://issues.apache.org/jira/browse/FLINK-8703 > Project: Flink > Issue Type: Sub-task > Components: Tests > Reporter: Aljoscha Krettek > Assignee: Chesnay Schepler > Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)