I have one last question. As I understand, the current timeout covers the total duration, including all retries. However, I’m wondering if there have been any discussions or plans to support a timeout that only applies to a *single* try (not the total time across retries).
This might just be a special case for my use, but I was curious— is there a way to set a timeout for just one attempt, so that AsyncWaitOperator can retry without needing to restart the entire job? Thanks again for your support. On Fri, Jul 11, 2025 at 10:22 AM HONGGEUN JI <gihon...@gmail.com> wrote: > Hi Lincoln, > Thank you for looking into this issue. > > I appreciate your effort, and I’d be happy to review your PR. > > > > > On Thu, Jul 10, 2025 at 10:38 PM Lincoln Lee <lincoln.8...@gmail.com> > wrote: > >> Good question! The current implementation has the unexpected extra retries >> in your case. >> >> After some thought — we can actually avoid unnecessary retries here >> without >> changing the current behavior where the timeout covers the total duration. >> >> To add some context, this was indeed a missing piece in the original >> design. >> Back when we were first discussing the FLIP[1], we had considered tracking >> execution >> time as part of the state. However, for various reasons, we ultimately >> went >> with >> the simpler stateless approach (which may lead to additional retries from >> the >> user's perspective after a job restart). You’ve hit on the two key issues >> we >> debated back then, this clearly shows there's still room for improvement. >> >> I've created a PR https://github.com/apache/flink/pull/26779 >> Could you help review it? We can continue the discussion there. >> >> 1. https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g >> >> >> Best, >> Lincoln Lee >> >> >> HONGGEUN JI <gihon...@gmail.com> 于2025年7月10日周四 09:29写道: >> >> > Thank you for the clarification. I now understand that the timeout() >> method >> > is intended to cover all async I/O attempts, including retries. >> Apologies >> > for the confusion in my previous message. >> > >> > If you don’t mind, I’d like to ask one more question to make sure I >> fully >> > understand the behavior. >> > >> > Let’s say a timeout occurs during any async operation (either the >> initial >> > call or during the retry). Inside my overridden timeout() method, I call >> > resultFuture.completeExceptionally(...) >> > >> > From what I can see in the code, after that completeExceptionally(), the >> > RetryableResultHandlerDelegator#processRetry method is eventually >> invoked. >> > >> > Inside processRetry, the retry can happens if the following condition is >> > true: >> > >> > if (satisfy && asyncRetryStrategy.canRetry(currentAttempts) && >> > !retryDisabledOnFinish.get()) >> > >> > So my question is: >> > If completeExceptionally(...) is called inside the timeout() method, and >> > the retry conditions are met, will the retry still be executed even >> after >> > the timeout? Or is the timeout supposed to be the final failure point? >> > >> > I want to make sure I’m not misunderstanding how timeout() interacts >> with >> > the retry mechanism. >> > >> > Thank you again for your help! >> > >> > Best regards, >> > Honggeun Ji >> > >> > >> > >> > >> > On Wed, Jul 9, 2025 at 10:50 PM Lincoln Lee <lincoln.8...@gmail.com> >> > wrote: >> > >> > > Hi Honggeun, >> > > >> > > It looks like the confusion stems from a documentation issue. >> > > >> > > As the code comment explains: >> > > >> > > * @param timeout from first invoke to final completion of asynchronous >> > > operation, may include >> > > * multiple retries, and will be reset in case of restart >> > > >> > > This means the timeout covers the entire duration of the async >> operation, >> > > including any retries. You could try increasing the timeout value to >> > ensure >> > > it accommodates the maximum expected duration, including retries. >> > > >> > > This detail should definitely be made clearer in the documentation: >> > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api >> > > >> > > Will address it. >> > > >> > > Best, >> > > Lincoln Lee >> > > >> > > >> > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道: >> > > >> > > > Dear Flink Community, >> > > > >> > > > I am using AsyncDataStream.unorderedWaitWithRetry together with a >> > > > RichAsyncFunction to perform asynchronous processing in Flink 1.19. >> I >> > > have >> > > > configured a retry strategy using FixedDelayRetryStrategy with a >> > maximum >> > > of >> > > > 3 retries and a delay of 1000 milliseconds. >> > > > >> > > > *Issue:* >> > > > When the initial call to asyncInvoke() times out, the overridden >> > > timeout() >> > > > method is invoked as expected. However, when a retry is triggered >> > (either >> > > > due to a timeout or another exception), and that retry also >> > experiences a >> > > > timeout, the timeout() method is not called again. It seems that the >> > > > timeout is not re-registered for retries, which causes those retry >> > > attempts >> > > > to hang indefinitely. >> > > > >> > > > *Observed Output:* >> > > > >> > > > CustomSend AsyncIO start...!Trying document at 1752058980697 --> >> > > > customDoc://korea.test.test/12345Timeout document at 1752058982687 >> --> >> > > > customDoc://korea.test.test/12345Trying document at 1752058983701 >> --> >> > > > customDoc://korea.test.test/12345// No further timeout messages >> appear >> > > > >> > > > *Expected Output:* >> > > > Each retry should also be subject to timeout handling, and the >> > timeout() >> > > > method should be called if the retry does not complete within the >> > timeout >> > > > period. For example: >> > > > >> > > > Trying document...Timeout...Trying document again...Timeout... >> > > > (repeated up to max retries) >> > > > >> > > > *Test Configuration Summary:* >> > > > >> > > > - >> > > > >> > > > In asyncInvoke(), I simulate long processing using >> > > Thread.sleep(150000) >> > > > inside a CompletableFuture to force timeouts. >> > > > - >> > > > >> > > > The timeout() method calls >> resultFuture.completeExceptionally(...). >> > > > - >> > > > >> > > > unorderedWaitWithRetry(...) configuration: >> > > > - >> > > > >> > > > Timeout: 1000 ms >> > > > - >> > > > >> > > > Retry Strategy: Fixed delay, 3 retries, 1000 ms delay >> > > > - >> > > > >> > > > Parallelism: 1 >> > > > >> > > > *Workaround / Potential Fix:* >> > > > I tried modifying the AsyncWaitOperator#tryOnce(...) method to >> manually >> > > > call resultHandlerDelegator.registerTimeout(timeout) before each >> retry >> > > > attempt. This seems to resolve the issue and ensures that all >> retries >> > > > respect the timeout. >> > > > >> > > > private void tryOnce(RetryableResultHandlerDelegator >> > > > resultHandlerDelegator) throws Exception { >> > > > resultHandlerDelegator.currentAttempts++; >> > > > resultHandlerDelegator.registerTimeout(timeout); // Fix? >> > > > userFunction.asyncInvoke( >> > > > resultHandlerDelegator.resultHandler.inputRecord.getValue(), >> > > > resultHandlerDelegator); >> > > > } >> > > > >> > > > *My Questions:* >> > > > >> > > > - >> > > > >> > > > Is this a known issue? >> > > > - >> > > > >> > > > Is it valid to call registerTimeout() inside tryOnce()? >> > > > - >> > > > >> > > > Could this introduce any side effects, especially when called via >> > > > finishInFlightDelayedRetry()? >> > > > >> > > > If this is confirmed to be a bug, I am happy to create a JIRA ticket >> > and >> > > > submit a fix. >> > > > >> > > > Thank you for your time and support. >> > > > >> > > > Best regards, >> > > > Honggeun Ji >> > > > >> > > >> > >> >