Hi Honggeun, It looks like the confusion stems from a documentation issue.
As the code comment explains: * @param timeout from first invoke to final completion of asynchronous operation, may include * multiple retries, and will be reset in case of restart This means the timeout covers the entire duration of the async operation, including any retries. You could try increasing the timeout value to ensure it accommodates the maximum expected duration, including retries. This detail should definitely be made clearer in the documentation: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api Will address it. Best, Lincoln Lee HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道: > Dear Flink Community, > > I am using AsyncDataStream.unorderedWaitWithRetry together with a > RichAsyncFunction to perform asynchronous processing in Flink 1.19. I have > configured a retry strategy using FixedDelayRetryStrategy with a maximum of > 3 retries and a delay of 1000 milliseconds. > > *Issue:* > When the initial call to asyncInvoke() times out, the overridden timeout() > method is invoked as expected. However, when a retry is triggered (either > due to a timeout or another exception), and that retry also experiences a > timeout, the timeout() method is not called again. It seems that the > timeout is not re-registered for retries, which causes those retry attempts > to hang indefinitely. > > *Observed Output:* > > CustomSend AsyncIO start...!Trying document at 1752058980697 --> > customDoc://korea.test.test/12345Timeout document at 1752058982687 --> > customDoc://korea.test.test/12345Trying document at 1752058983701 --> > customDoc://korea.test.test/12345// No further timeout messages appear > > *Expected Output:* > Each retry should also be subject to timeout handling, and the timeout() > method should be called if the retry does not complete within the timeout > period. For example: > > Trying document...Timeout...Trying document again...Timeout... > (repeated up to max retries) > > *Test Configuration Summary:* > > - > > In asyncInvoke(), I simulate long processing using Thread.sleep(150000) > inside a CompletableFuture to force timeouts. > - > > The timeout() method calls resultFuture.completeExceptionally(...). > - > > unorderedWaitWithRetry(...) configuration: > - > > Timeout: 1000 ms > - > > Retry Strategy: Fixed delay, 3 retries, 1000 ms delay > - > > Parallelism: 1 > > *Workaround / Potential Fix:* > I tried modifying the AsyncWaitOperator#tryOnce(...) method to manually > call resultHandlerDelegator.registerTimeout(timeout) before each retry > attempt. This seems to resolve the issue and ensures that all retries > respect the timeout. > > private void tryOnce(RetryableResultHandlerDelegator > resultHandlerDelegator) throws Exception { > resultHandlerDelegator.currentAttempts++; > resultHandlerDelegator.registerTimeout(timeout); // Fix? > userFunction.asyncInvoke( > resultHandlerDelegator.resultHandler.inputRecord.getValue(), > resultHandlerDelegator); > } > > *My Questions:* > > - > > Is this a known issue? > - > > Is it valid to call registerTimeout() inside tryOnce()? > - > > Could this introduce any side effects, especially when called via > finishInFlightDelayedRetry()? > > If this is confirmed to be a bug, I am happy to create a JIRA ticket and > submit a fix. > > Thank you for your time and support. > > Best regards, > Honggeun Ji >