[ https://issues.apache.org/jira/browse/FLINK-8797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380203#comment-16380203 ]
ASF GitHub Bot commented on FLINK-8797: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5589#discussion_r171228120 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java --- @@ -223,6 +223,78 @@ } } + /** + * Retry the given operation with the given delay in between successful completions where the + * result does not match a given predicate. + * + * @param operation to retry + * @param retries number of retries + * @param retryDelay delay between retries + * @param retryPredicate Predicate to test whether the result is acceptable + * @param scheduledExecutor executor to be used for the retry operation + * @param <T> type of the result + * @return Future which retries the given operation a given amount of times and delays the retry + * in case the predicate isn't matched + */ + public static <T> CompletableFuture<T> retrySuccesfulWithDelay( + final Supplier<CompletableFuture<T>> operation, + final long retries, + final Time retryDelay, + final Predicate<T> retryPredicate, + final ScheduledExecutor scheduledExecutor) { + + final CompletableFuture<T> resultFuture = new CompletableFuture<>(); + + retrySuccessfulOperationWithDelay( + resultFuture, + operation, + retries, + retryDelay, + retryPredicate, + scheduledExecutor); + + return resultFuture; + } + + private static <T> void retrySuccessfulOperationWithDelay( + final CompletableFuture<T> resultFuture, + final Supplier<CompletableFuture<T>> operation, + final long retries, + final Time retryDelay, + final Predicate<T> retryPredicate, + final ScheduledExecutor scheduledExecutor) { + + if (!resultFuture.isDone()) { + final CompletableFuture<T> operationResultFuture = operation.get(); + + operationResultFuture.whenComplete( + (t, throwable) -> { + if (throwable != null) { + if (throwable instanceof CancellationException) { + resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable)); + } else { + resultFuture.completeExceptionally(throwable); + } + } else { + if (retries > 0 && !retryPredicate.test(t)) { + final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule( + () -> retrySuccessfulOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, retryPredicate, scheduledExecutor), + retryDelay.toMilliseconds(), + TimeUnit.MILLISECONDS); + + resultFuture.whenComplete( + (innerT, innerThrowable) -> scheduledFuture.cancel(false)); + } else { + resultFuture.complete(t); --- End diff -- This seems rather unfortunate. In the code sample below I wait for a job to be RUNNING. I can only replace the loop, but not the final check which really limits the userfulness: ``` JobStatus jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS); while (jobStatus != JobStatus.RUNNING && submissionDeadLine.hasTimeLeft()) { Thread.sleep(50); jobStatus = client.getJobStatus(jobSubmissionResult.getJobID()).get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS); } if (jobStatus != JobStatus.RUNNING) { Assert.fail("Job not in state RUNNING."); } ================== Comparison =============================== CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay( () -> client.getJobStatus(jobSubmissionResult.getJobID()), submissionDeadLine.timeLeft().toMillis() / 50, // crappy retry count calculation Time.milliseconds(50), status -> status == JobStatus.RUNNING, TestingUtils.defaultScheduledExecutor() ); if (jobStatusFuture.get(submissionDeadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS) != JobStatus.RUNNING) { Assert.fail("Job not in state RUNNING."); } ``` In it's current form this doesn't really provide value imo. (yes it's asynchronous, but in which tests do really even we need that?) > Port AbstractOperatorRestoreTestBase to MiniClusterResource > ----------------------------------------------------------- > > Key: FLINK-8797 > URL: https://issues.apache.org/jira/browse/FLINK-8797 > Project: Flink > Issue Type: Sub-task > Components: Tests > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)