Good question! The current implementation has the unexpected extra retries in your case.
After some thought — we can actually avoid unnecessary retries here without changing the current behavior where the timeout covers the total duration. To add some context, this was indeed a missing piece in the original design. Back when we were first discussing the FLIP[1], we had considered tracking execution time as part of the state. However, for various reasons, we ultimately went with the simpler stateless approach (which may lead to additional retries from the user's perspective after a job restart). You’ve hit on the two key issues we debated back then, this clearly shows there's still room for improvement. I've created a PR https://github.com/apache/flink/pull/26779 Could you help review it? We can continue the discussion there. 1. https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g Best, Lincoln Lee HONGGEUN JI <gihon...@gmail.com> 于2025年7月10日周四 09:29写道: > Thank you for the clarification. I now understand that the timeout() method > is intended to cover all async I/O attempts, including retries. Apologies > for the confusion in my previous message. > > If you don’t mind, I’d like to ask one more question to make sure I fully > understand the behavior. > > Let’s say a timeout occurs during any async operation (either the initial > call or during the retry). Inside my overridden timeout() method, I call > resultFuture.completeExceptionally(...) > > From what I can see in the code, after that completeExceptionally(), the > RetryableResultHandlerDelegator#processRetry method is eventually invoked. > > Inside processRetry, the retry can happens if the following condition is > true: > > if (satisfy && asyncRetryStrategy.canRetry(currentAttempts) && > !retryDisabledOnFinish.get()) > > So my question is: > If completeExceptionally(...) is called inside the timeout() method, and > the retry conditions are met, will the retry still be executed even after > the timeout? Or is the timeout supposed to be the final failure point? > > I want to make sure I’m not misunderstanding how timeout() interacts with > the retry mechanism. > > Thank you again for your help! > > Best regards, > Honggeun Ji > > > > > On Wed, Jul 9, 2025 at 10:50 PM Lincoln Lee <lincoln.8...@gmail.com> > wrote: > > > Hi Honggeun, > > > > It looks like the confusion stems from a documentation issue. > > > > As the code comment explains: > > > > * @param timeout from first invoke to final completion of asynchronous > > operation, may include > > * multiple retries, and will be reset in case of restart > > > > This means the timeout covers the entire duration of the async operation, > > including any retries. You could try increasing the timeout value to > ensure > > it accommodates the maximum expected duration, including retries. > > > > This detail should definitely be made clearer in the documentation: > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api > > > > Will address it. > > > > Best, > > Lincoln Lee > > > > > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道: > > > > > Dear Flink Community, > > > > > > I am using AsyncDataStream.unorderedWaitWithRetry together with a > > > RichAsyncFunction to perform asynchronous processing in Flink 1.19. I > > have > > > configured a retry strategy using FixedDelayRetryStrategy with a > maximum > > of > > > 3 retries and a delay of 1000 milliseconds. > > > > > > *Issue:* > > > When the initial call to asyncInvoke() times out, the overridden > > timeout() > > > method is invoked as expected. However, when a retry is triggered > (either > > > due to a timeout or another exception), and that retry also > experiences a > > > timeout, the timeout() method is not called again. It seems that the > > > timeout is not re-registered for retries, which causes those retry > > attempts > > > to hang indefinitely. > > > > > > *Observed Output:* > > > > > > CustomSend AsyncIO start...!Trying document at 1752058980697 --> > > > customDoc://korea.test.test/12345Timeout document at 1752058982687 --> > > > customDoc://korea.test.test/12345Trying document at 1752058983701 --> > > > customDoc://korea.test.test/12345// No further timeout messages appear > > > > > > *Expected Output:* > > > Each retry should also be subject to timeout handling, and the > timeout() > > > method should be called if the retry does not complete within the > timeout > > > period. For example: > > > > > > Trying document...Timeout...Trying document again...Timeout... > > > (repeated up to max retries) > > > > > > *Test Configuration Summary:* > > > > > > - > > > > > > In asyncInvoke(), I simulate long processing using > > Thread.sleep(150000) > > > inside a CompletableFuture to force timeouts. > > > - > > > > > > The timeout() method calls resultFuture.completeExceptionally(...). > > > - > > > > > > unorderedWaitWithRetry(...) configuration: > > > - > > > > > > Timeout: 1000 ms > > > - > > > > > > Retry Strategy: Fixed delay, 3 retries, 1000 ms delay > > > - > > > > > > Parallelism: 1 > > > > > > *Workaround / Potential Fix:* > > > I tried modifying the AsyncWaitOperator#tryOnce(...) method to manually > > > call resultHandlerDelegator.registerTimeout(timeout) before each retry > > > attempt. This seems to resolve the issue and ensures that all retries > > > respect the timeout. > > > > > > private void tryOnce(RetryableResultHandlerDelegator > > > resultHandlerDelegator) throws Exception { > > > resultHandlerDelegator.currentAttempts++; > > > resultHandlerDelegator.registerTimeout(timeout); // Fix? > > > userFunction.asyncInvoke( > > > resultHandlerDelegator.resultHandler.inputRecord.getValue(), > > > resultHandlerDelegator); > > > } > > > > > > *My Questions:* > > > > > > - > > > > > > Is this a known issue? > > > - > > > > > > Is it valid to call registerTimeout() inside tryOnce()? > > > - > > > > > > Could this introduce any side effects, especially when called via > > > finishInFlightDelayedRetry()? > > > > > > If this is confirmed to be a bug, I am happy to create a JIRA ticket > and > > > submit a fix. > > > > > > Thank you for your time and support. > > > > > > Best regards, > > > Honggeun Ji > > > > > >