Thank you for the clarification. I now understand that the timeout() method is intended to cover all async I/O attempts, including retries. Apologies for the confusion in my previous message.
If you don’t mind, I’d like to ask one more question to make sure I fully understand the behavior. Let’s say a timeout occurs during any async operation (either the initial call or during the retry). Inside my overridden timeout() method, I call resultFuture.completeExceptionally(...) >From what I can see in the code, after that completeExceptionally(), the RetryableResultHandlerDelegator#processRetry method is eventually invoked. Inside processRetry, the retry can happens if the following condition is true: if (satisfy && asyncRetryStrategy.canRetry(currentAttempts) && !retryDisabledOnFinish.get()) So my question is: If completeExceptionally(...) is called inside the timeout() method, and the retry conditions are met, will the retry still be executed even after the timeout? Or is the timeout supposed to be the final failure point? I want to make sure I’m not misunderstanding how timeout() interacts with the retry mechanism. Thank you again for your help! Best regards, Honggeun Ji On Wed, Jul 9, 2025 at 10:50 PM Lincoln Lee <lincoln.8...@gmail.com> wrote: > Hi Honggeun, > > It looks like the confusion stems from a documentation issue. > > As the code comment explains: > > * @param timeout from first invoke to final completion of asynchronous > operation, may include > * multiple retries, and will be reset in case of restart > > This means the timeout covers the entire duration of the async operation, > including any retries. You could try increasing the timeout value to ensure > it accommodates the maximum expected duration, including retries. > > This detail should definitely be made clearer in the documentation: > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api > > Will address it. > > Best, > Lincoln Lee > > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道: > > > Dear Flink Community, > > > > I am using AsyncDataStream.unorderedWaitWithRetry together with a > > RichAsyncFunction to perform asynchronous processing in Flink 1.19. I > have > > configured a retry strategy using FixedDelayRetryStrategy with a maximum > of > > 3 retries and a delay of 1000 milliseconds. > > > > *Issue:* > > When the initial call to asyncInvoke() times out, the overridden > timeout() > > method is invoked as expected. However, when a retry is triggered (either > > due to a timeout or another exception), and that retry also experiences a > > timeout, the timeout() method is not called again. It seems that the > > timeout is not re-registered for retries, which causes those retry > attempts > > to hang indefinitely. > > > > *Observed Output:* > > > > CustomSend AsyncIO start...!Trying document at 1752058980697 --> > > customDoc://korea.test.test/12345Timeout document at 1752058982687 --> > > customDoc://korea.test.test/12345Trying document at 1752058983701 --> > > customDoc://korea.test.test/12345// No further timeout messages appear > > > > *Expected Output:* > > Each retry should also be subject to timeout handling, and the timeout() > > method should be called if the retry does not complete within the timeout > > period. For example: > > > > Trying document...Timeout...Trying document again...Timeout... > > (repeated up to max retries) > > > > *Test Configuration Summary:* > > > > - > > > > In asyncInvoke(), I simulate long processing using > Thread.sleep(150000) > > inside a CompletableFuture to force timeouts. > > - > > > > The timeout() method calls resultFuture.completeExceptionally(...). > > - > > > > unorderedWaitWithRetry(...) configuration: > > - > > > > Timeout: 1000 ms > > - > > > > Retry Strategy: Fixed delay, 3 retries, 1000 ms delay > > - > > > > Parallelism: 1 > > > > *Workaround / Potential Fix:* > > I tried modifying the AsyncWaitOperator#tryOnce(...) method to manually > > call resultHandlerDelegator.registerTimeout(timeout) before each retry > > attempt. This seems to resolve the issue and ensures that all retries > > respect the timeout. > > > > private void tryOnce(RetryableResultHandlerDelegator > > resultHandlerDelegator) throws Exception { > > resultHandlerDelegator.currentAttempts++; > > resultHandlerDelegator.registerTimeout(timeout); // Fix? > > userFunction.asyncInvoke( > > resultHandlerDelegator.resultHandler.inputRecord.getValue(), > > resultHandlerDelegator); > > } > > > > *My Questions:* > > > > - > > > > Is this a known issue? > > - > > > > Is it valid to call registerTimeout() inside tryOnce()? > > - > > > > Could this introduce any side effects, especially when called via > > finishInFlightDelayedRetry()? > > > > If this is confirmed to be a bug, I am happy to create a JIRA ticket and > > submit a fix. > > > > Thank you for your time and support. > > > > Best regards, > > Honggeun Ji > > >