Dear Flink Community, I am using AsyncDataStream.unorderedWaitWithRetry together with a RichAsyncFunction to perform asynchronous processing in Flink 1.19. I have configured a retry strategy using FixedDelayRetryStrategy with a maximum of 3 retries and a delay of 1000 milliseconds.
*Issue:* When the initial call to asyncInvoke() times out, the overridden timeout() method is invoked as expected. However, when a retry is triggered (either due to a timeout or another exception), and that retry also experiences a timeout, the timeout() method is not called again. It seems that the timeout is not re-registered for retries, which causes those retry attempts to hang indefinitely. *Observed Output:* CustomSend AsyncIO start...!Trying document at 1752058980697 --> customDoc://korea.test.test/12345Timeout document at 1752058982687 --> customDoc://korea.test.test/12345Trying document at 1752058983701 --> customDoc://korea.test.test/12345// No further timeout messages appear *Expected Output:* Each retry should also be subject to timeout handling, and the timeout() method should be called if the retry does not complete within the timeout period. For example: Trying document...Timeout...Trying document again...Timeout... (repeated up to max retries) *Test Configuration Summary:* - In asyncInvoke(), I simulate long processing using Thread.sleep(150000) inside a CompletableFuture to force timeouts. - The timeout() method calls resultFuture.completeExceptionally(...). - unorderedWaitWithRetry(...) configuration: - Timeout: 1000 ms - Retry Strategy: Fixed delay, 3 retries, 1000 ms delay - Parallelism: 1 *Workaround / Potential Fix:* I tried modifying the AsyncWaitOperator#tryOnce(...) method to manually call resultHandlerDelegator.registerTimeout(timeout) before each retry attempt. This seems to resolve the issue and ensures that all retries respect the timeout. private void tryOnce(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception { resultHandlerDelegator.currentAttempts++; resultHandlerDelegator.registerTimeout(timeout); // Fix? userFunction.asyncInvoke( resultHandlerDelegator.resultHandler.inputRecord.getValue(), resultHandlerDelegator); } *My Questions:* - Is this a known issue? - Is it valid to call registerTimeout() inside tryOnce()? - Could this introduce any side effects, especially when called via finishInFlightDelayedRetry()? If this is confirmed to be a bug, I am happy to create a JIRA ticket and submit a fix. Thank you for your time and support. Best regards, Honggeun Ji