Hi Lincoln, Thank you for looking into this issue. I appreciate your effort, and I’d be happy to review your PR.
On Thu, Jul 10, 2025 at 10:38 PM Lincoln Lee <lincoln.8...@gmail.com> wrote: > Good question! The current implementation has the unexpected extra retries > in your case. > > After some thought — we can actually avoid unnecessary retries here without > changing the current behavior where the timeout covers the total duration. > > To add some context, this was indeed a missing piece in the original > design. > Back when we were first discussing the FLIP[1], we had considered tracking > execution > time as part of the state. However, for various reasons, we ultimately went > with > the simpler stateless approach (which may lead to additional retries from > the > user's perspective after a job restart). You’ve hit on the two key issues > we > debated back then, this clearly shows there's still room for improvement. > > I've created a PR https://github.com/apache/flink/pull/26779 > Could you help review it? We can continue the discussion there. > > 1. https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g > > > Best, > Lincoln Lee > > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月10日周四 09:29写道: > > > Thank you for the clarification. I now understand that the timeout() > method > > is intended to cover all async I/O attempts, including retries. Apologies > > for the confusion in my previous message. > > > > If you don’t mind, I’d like to ask one more question to make sure I fully > > understand the behavior. > > > > Let’s say a timeout occurs during any async operation (either the initial > > call or during the retry). Inside my overridden timeout() method, I call > > resultFuture.completeExceptionally(...) > > > > From what I can see in the code, after that completeExceptionally(), the > > RetryableResultHandlerDelegator#processRetry method is eventually > invoked. > > > > Inside processRetry, the retry can happens if the following condition is > > true: > > > > if (satisfy && asyncRetryStrategy.canRetry(currentAttempts) && > > !retryDisabledOnFinish.get()) > > > > So my question is: > > If completeExceptionally(...) is called inside the timeout() method, and > > the retry conditions are met, will the retry still be executed even after > > the timeout? Or is the timeout supposed to be the final failure point? > > > > I want to make sure I’m not misunderstanding how timeout() interacts with > > the retry mechanism. > > > > Thank you again for your help! > > > > Best regards, > > Honggeun Ji > > > > > > > > > > On Wed, Jul 9, 2025 at 10:50 PM Lincoln Lee <lincoln.8...@gmail.com> > > wrote: > > > > > Hi Honggeun, > > > > > > It looks like the confusion stems from a documentation issue. > > > > > > As the code comment explains: > > > > > > * @param timeout from first invoke to final completion of asynchronous > > > operation, may include > > > * multiple retries, and will be reset in case of restart > > > > > > This means the timeout covers the entire duration of the async > operation, > > > including any retries. You could try increasing the timeout value to > > ensure > > > it accommodates the maximum expected duration, including retries. > > > > > > This detail should definitely be made clearer in the documentation: > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api > > > > > > Will address it. > > > > > > Best, > > > Lincoln Lee > > > > > > > > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道: > > > > > > > Dear Flink Community, > > > > > > > > I am using AsyncDataStream.unorderedWaitWithRetry together with a > > > > RichAsyncFunction to perform asynchronous processing in Flink 1.19. I > > > have > > > > configured a retry strategy using FixedDelayRetryStrategy with a > > maximum > > > of > > > > 3 retries and a delay of 1000 milliseconds. > > > > > > > > *Issue:* > > > > When the initial call to asyncInvoke() times out, the overridden > > > timeout() > > > > method is invoked as expected. However, when a retry is triggered > > (either > > > > due to a timeout or another exception), and that retry also > > experiences a > > > > timeout, the timeout() method is not called again. It seems that the > > > > timeout is not re-registered for retries, which causes those retry > > > attempts > > > > to hang indefinitely. > > > > > > > > *Observed Output:* > > > > > > > > CustomSend AsyncIO start...!Trying document at 1752058980697 --> > > > > customDoc://korea.test.test/12345Timeout document at 1752058982687 > --> > > > > customDoc://korea.test.test/12345Trying document at 1752058983701 --> > > > > customDoc://korea.test.test/12345// No further timeout messages > appear > > > > > > > > *Expected Output:* > > > > Each retry should also be subject to timeout handling, and the > > timeout() > > > > method should be called if the retry does not complete within the > > timeout > > > > period. For example: > > > > > > > > Trying document...Timeout...Trying document again...Timeout... > > > > (repeated up to max retries) > > > > > > > > *Test Configuration Summary:* > > > > > > > > - > > > > > > > > In asyncInvoke(), I simulate long processing using > > > Thread.sleep(150000) > > > > inside a CompletableFuture to force timeouts. > > > > - > > > > > > > > The timeout() method calls > resultFuture.completeExceptionally(...). > > > > - > > > > > > > > unorderedWaitWithRetry(...) configuration: > > > > - > > > > > > > > Timeout: 1000 ms > > > > - > > > > > > > > Retry Strategy: Fixed delay, 3 retries, 1000 ms delay > > > > - > > > > > > > > Parallelism: 1 > > > > > > > > *Workaround / Potential Fix:* > > > > I tried modifying the AsyncWaitOperator#tryOnce(...) method to > manually > > > > call resultHandlerDelegator.registerTimeout(timeout) before each > retry > > > > attempt. This seems to resolve the issue and ensures that all retries > > > > respect the timeout. > > > > > > > > private void tryOnce(RetryableResultHandlerDelegator > > > > resultHandlerDelegator) throws Exception { > > > > resultHandlerDelegator.currentAttempts++; > > > > resultHandlerDelegator.registerTimeout(timeout); // Fix? > > > > userFunction.asyncInvoke( > > > > resultHandlerDelegator.resultHandler.inputRecord.getValue(), > > > > resultHandlerDelegator); > > > > } > > > > > > > > *My Questions:* > > > > > > > > - > > > > > > > > Is this a known issue? > > > > - > > > > > > > > Is it valid to call registerTimeout() inside tryOnce()? > > > > - > > > > > > > > Could this introduce any side effects, especially when called via > > > > finishInFlightDelayedRetry()? > > > > > > > > If this is confirmed to be a bug, I am happy to create a JIRA ticket > > and > > > > submit a fix. > > > > > > > > Thank you for your time and support. > > > > > > > > Best regards, > > > > Honggeun Ji > > > > > > > > > >