Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5312#discussion_r163775424 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -258,89 +312,96 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto final CompletableFuture<SavepointTriggerResponseBody> responseFuture; - try { - responseFuture = restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), - savepointTriggerHeaders, - savepointTriggerMessageParameters, - new SavepointTriggerRequestBody(savepointDirectory)); - } catch (IOException e) { - throw new FlinkException("Could not send trigger savepoint request to Flink cluster.", e); - } + responseFuture = sendRequest( + savepointTriggerHeaders, + savepointTriggerMessageParameters, + new SavepointTriggerRequestBody(savepointDirectory)); - return responseFuture.thenApply(savepointTriggerResponseBody -> { + return responseFuture.thenCompose(savepointTriggerResponseBody -> { final SavepointTriggerId savepointTriggerId = savepointTriggerResponseBody.getSavepointTriggerId(); - final SavepointInfo savepointInfo; - try { - savepointInfo = waitForSavepointCompletion(jobId, savepointTriggerId); - } catch (Exception e) { - throw new CompletionException(e); - } + return waitForSavepointCompletion(jobId, savepointTriggerId); + }).thenApply(savepointInfo -> { if (savepointInfo.getFailureCause() != null) { throw new CompletionException(savepointInfo.getFailureCause()); } return savepointInfo.getLocation(); }); } - private SavepointInfo waitForSavepointCompletion( + private CompletableFuture<SavepointInfo> waitForSavepointCompletion( --- End diff -- Isn't this more `getSavepointFuture` instead of waiting? Waiting indicates for me that I wait for the completion of a future.
---