Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r164074804 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -258,89 +312,96 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto final CompletableFuture<SavepointTriggerResponseBody> responseFuture; - try { - responseFuture = restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), - savepointTriggerHeaders, - savepointTriggerMessageParameters, - new SavepointTriggerRequestBody(savepointDirectory)); - } catch (IOException e) { - throw new FlinkException("Could not send trigger savepoint request to Flink cluster.", e); - } + responseFuture = sendRequest( + savepointTriggerHeaders, + savepointTriggerMessageParameters, + new SavepointTriggerRequestBody(savepointDirectory)); - return responseFuture.thenApply(savepointTriggerResponseBody -> { + return responseFuture.thenCompose(savepointTriggerResponseBody -> { final SavepointTriggerId savepointTriggerId = savepointTriggerResponseBody.getSavepointTriggerId(); - final SavepointInfo savepointInfo; - try { - savepointInfo = waitForSavepointCompletion(jobId, savepointTriggerId); - } catch (Exception e) { - throw new CompletionException(e); - } + return waitForSavepointCompletion(jobId, savepointTriggerId); + }).thenApply(savepointInfo -> { if (savepointInfo.getFailureCause() != null) { throw new CompletionException(savepointInfo.getFailureCause()); } return savepointInfo.getLocation(); }); } - private SavepointInfo waitForSavepointCompletion( + private CompletableFuture<SavepointInfo> waitForSavepointCompletion( final JobID jobId, - final SavepointTriggerId savepointTriggerId) throws Exception { + final SavepointTriggerId savepointTriggerId) { return waitForResource(() -> { final SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance(); final SavepointStatusMessageParameters savepointStatusMessageParameters = savepointStatusHeaders.getUnresolvedMessageParameters(); savepointStatusMessageParameters.jobIdPathParameter.resolve(jobId); savepointStatusMessageParameters.savepointTriggerIdPathParameter.resolve(savepointTriggerId); - return restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), + return sendRetryableRequest( savepointStatusHeaders, - savepointStatusMessageParameters - ); + savepointStatusMessageParameters, + EmptyRequestBody.getInstance(), + isConnectionProblemException()); }); } @Override public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception { - JobsOverviewHeaders headers = JobsOverviewHeaders.getInstance(); - CompletableFuture<MultipleJobsDetails> jobDetailsFuture = restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), - headers - ); - return jobDetailsFuture + return sendRequest(JobsOverviewHeaders.getInstance()) .thenApply( - (MultipleJobsDetails multipleJobsDetails) -> { - final Collection<JobDetails> jobDetails = multipleJobsDetails.getJobs(); - Collection<JobStatusMessage> flattenedDetails = new ArrayList<>(jobDetails.size()); - jobDetails.forEach(detail -> flattenedDetails.add(new JobStatusMessage(detail.getJobId(), detail.getJobName(), detail.getStatus(), detail.getStartTime()))); - - return flattenedDetails; - }); + (multipleJobsDetails) -> multipleJobsDetails + .getJobs() + .stream() + .map(detail -> new JobStatusMessage( + detail.getJobId(), + detail.getJobName(), + detail.getStatus(), + detail.getStartTime())) + .collect(Collectors.toList())); } @Override public T getClusterId() { return clusterId; } - private <R, A extends AsynchronouslyCreatedResource<R>> R waitForResource( - final SupplierWithException<CompletableFuture<A>, IOException> resourceFutureSupplier) - throws IOException, InterruptedException, ExecutionException, TimeoutException { - A asynchronouslyCreatedResource; - long attempt = 0; - while (true) { - final CompletableFuture<A> responseFuture = resourceFutureSupplier.get(); - asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - if (asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED) { - break; + /** + * Polls a {@code AsynchronouslyCreatedResource} until its + * {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes + * {@link QueueStatus.Id#COMPLETED COMPLETED}. This method returns a {@code CompletableFuture} + * which completes with {@link AsynchronouslyCreatedResource#resource()}. + * + * @param resourceFutureSupplier The operation which polls for the + * {@code AsynchronouslyCreatedResource}. + * @param <R> The type of the resource. + * @param <A> The type of the {@code AsynchronouslyCreatedResource}. + * @return A {@code CompletableFuture} delivering the resource. + */ + private <R, A extends AsynchronouslyCreatedResource<R>> CompletableFuture<R> waitForResource( --- End diff -- It's now `pollResourceAsync`. This is consistent with the methods in `CompletableFuture`.
---