Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5589#discussion_r171229327 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -223,6 +223,78 @@ } } + /** + * Retry the given operation with the given delay in between successful completions where the + * result does not match a given predicate. + * + * @param operation to retry + * @param retries number of retries + * @param retryDelay delay between retries + * @param retryPredicate Predicate to test whether the result is acceptable + * @param scheduledExecutor executor to be used for the retry operation + * @param <T> type of the result + * @return Future which retries the given operation a given amount of times and delays the retry + * in case the predicate isn't matched + */ + public static <T> CompletableFuture<T> retrySuccesfulWithDelay( + final Supplier<CompletableFuture<T>> operation, + final long retries, + final Time retryDelay, + final Predicate<T> retryPredicate, + final ScheduledExecutor scheduledExecutor) { + + final CompletableFuture<T> resultFuture = new CompletableFuture<>(); + + retrySuccessfulOperationWithDelay( + resultFuture, + operation, + retries, + retryDelay, + retryPredicate, + scheduledExecutor); + + return resultFuture; + } + + private static <T> void retrySuccessfulOperationWithDelay( + final CompletableFuture<T> resultFuture, + final Supplier<CompletableFuture<T>> operation, + final long retries, + final Time retryDelay, + final Predicate<T> retryPredicate, + final ScheduledExecutor scheduledExecutor) { + + if (!resultFuture.isDone()) { + final CompletableFuture<T> operationResultFuture = operation.get(); + + operationResultFuture.whenComplete( + (t, throwable) -> { + if (throwable != null) { + if (throwable instanceof CancellationException) { + resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable)); + } else { + resultFuture.completeExceptionally(throwable); + } + } else { + if (retries > 0 && !retryPredicate.test(t)) { + final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule( + () -> retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor), + retryDelay.toMilliseconds(), + TimeUnit.MILLISECONDS); + + resultFuture.whenComplete( + (innerT, innerThrowable) -> scheduledFuture.cancel(false)); + } else { + resultFuture.complete(t); --- End diff -- We can change it to throw an exception if the predicate doesn't match on the final try.
---