Dear Flink Community,

I am using AsyncDataStream.unorderedWaitWithRetry together with a
RichAsyncFunction to perform asynchronous processing in Flink 1.19. I have
configured a retry strategy using FixedDelayRetryStrategy with a maximum of
3 retries and a delay of 1000 milliseconds.

*Issue:*
When the initial call to asyncInvoke() times out, the overridden timeout()
method is invoked as expected. However, when a retry is triggered (either
due to a timeout or another exception), and that retry also experiences a
timeout, the timeout() method is not called again. It seems that the
timeout is not re-registered for retries, which causes those retry attempts
to hang indefinitely.

*Observed Output:*

CustomSend AsyncIO start...!Trying document at 1752058980697 -->
customDoc://korea.test.test/12345Timeout document at 1752058982687 -->
customDoc://korea.test.test/12345Trying document at 1752058983701 -->
customDoc://korea.test.test/12345// No further timeout messages appear

*Expected Output:*
Each retry should also be subject to timeout handling, and the timeout()
method should be called if the retry does not complete within the timeout
period. For example:

Trying document...Timeout...Trying document again...Timeout...
(repeated up to max retries)

*Test Configuration Summary:*

   -

   In asyncInvoke(), I simulate long processing using Thread.sleep(150000)
   inside a CompletableFuture to force timeouts.
   -

   The timeout() method calls resultFuture.completeExceptionally(...).
   -

   unorderedWaitWithRetry(...) configuration:
   -

      Timeout: 1000 ms
      -

      Retry Strategy: Fixed delay, 3 retries, 1000 ms delay
      -

      Parallelism: 1

*Workaround / Potential Fix:*
I tried modifying the AsyncWaitOperator#tryOnce(...) method to manually
call resultHandlerDelegator.registerTimeout(timeout) before each retry
attempt. This seems to resolve the issue and ensures that all retries
respect the timeout.

private void tryOnce(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
    resultHandlerDelegator.currentAttempts++;
    resultHandlerDelegator.registerTimeout(timeout); // Fix?
    userFunction.asyncInvoke(
        resultHandlerDelegator.resultHandler.inputRecord.getValue(),
        resultHandlerDelegator);
}

*My Questions:*

   -

   Is this a known issue?
   -

   Is it valid to call registerTimeout() inside tryOnce()?
   -

   Could this introduce any side effects, especially when called via
   finishInFlightDelayedRetry()?

If this is confirmed to be a bug, I am happy to create a JIRA ticket and
submit a fix.

Thank you for your time and support.

Best regards,
Honggeun Ji

Reply via email to