We did consider using a combination of per-attempt timeouts and an overall timeout. However, we ultimately decided to go with just a single total timeout. The main reasons were:
1. We wanted to keep the API simpler for users — introducing both a per-attempt timeout and a total timeout would require exposing two parameters, or keeping only the per-attempt one, in which case the actual total retry duration would depend on the retry strategy and be harder to reason about. 2. In most cases where a timeout occurs, it’s often because the external service is experiencing issues. In such situations, retrying is unlikely to yield a different result, so having a global timeout is often sufficient. Supporting separate retry-related timeout configurations would require changes to the public API, and such changes would need to go through the FLIP process (see [1]). In your case, if the async operation is expected to eventually succeed, you can consider increasing both the retry count and the total timeout. If you prefer to return a default value after retries are exhausted, you can override the `timeout` method and call `complete(your_default_value)` instead of `completeExceptionally`. 1. https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals Best, Lincoln Lee HONGGEUN JI <gihon...@gmail.com> 于2025年7月11日周五 13:53写道: > I have one last question. > > As I understand, the current timeout covers the total duration, including > all retries. > However, I’m wondering if there have been any discussions or plans to > support a timeout that only applies to a *single* try (not the total time > across retries). > > This might just be a special case for my use, but I was curious— > is there a way to set a timeout for just one attempt, so that > AsyncWaitOperator can retry without needing to restart the entire job? > > Thanks again for your support. > > > > > On Fri, Jul 11, 2025 at 10:22 AM HONGGEUN JI <gihon...@gmail.com> wrote: > > > Hi Lincoln, > > Thank you for looking into this issue. > > > > I appreciate your effort, and I’d be happy to review your PR. > > > > > > > > > > On Thu, Jul 10, 2025 at 10:38 PM Lincoln Lee <lincoln.8...@gmail.com> > > wrote: > > > >> Good question! The current implementation has the unexpected extra > retries > >> in your case. > >> > >> After some thought — we can actually avoid unnecessary retries here > >> without > >> changing the current behavior where the timeout covers the total > duration. > >> > >> To add some context, this was indeed a missing piece in the original > >> design. > >> Back when we were first discussing the FLIP[1], we had considered > tracking > >> execution > >> time as part of the state. However, for various reasons, we ultimately > >> went > >> with > >> the simpler stateless approach (which may lead to additional retries > from > >> the > >> user's perspective after a job restart). You’ve hit on the two key > issues > >> we > >> debated back then, this clearly shows there's still room for > improvement. > >> > >> I've created a PR https://github.com/apache/flink/pull/26779 > >> Could you help review it? We can continue the discussion there. > >> > >> 1. https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g > >> > >> > >> Best, > >> Lincoln Lee > >> > >> > >> HONGGEUN JI <gihon...@gmail.com> 于2025年7月10日周四 09:29写道: > >> > >> > Thank you for the clarification. I now understand that the timeout() > >> method > >> > is intended to cover all async I/O attempts, including retries. > >> Apologies > >> > for the confusion in my previous message. > >> > > >> > If you don’t mind, I’d like to ask one more question to make sure I > >> fully > >> > understand the behavior. > >> > > >> > Let’s say a timeout occurs during any async operation (either the > >> initial > >> > call or during the retry). Inside my overridden timeout() method, I > call > >> > resultFuture.completeExceptionally(...) > >> > > >> > From what I can see in the code, after that completeExceptionally(), > the > >> > RetryableResultHandlerDelegator#processRetry method is eventually > >> invoked. > >> > > >> > Inside processRetry, the retry can happens if the following condition > is > >> > true: > >> > > >> > if (satisfy && asyncRetryStrategy.canRetry(currentAttempts) && > >> > !retryDisabledOnFinish.get()) > >> > > >> > So my question is: > >> > If completeExceptionally(...) is called inside the timeout() method, > and > >> > the retry conditions are met, will the retry still be executed even > >> after > >> > the timeout? Or is the timeout supposed to be the final failure point? > >> > > >> > I want to make sure I’m not misunderstanding how timeout() interacts > >> with > >> > the retry mechanism. > >> > > >> > Thank you again for your help! > >> > > >> > Best regards, > >> > Honggeun Ji > >> > > >> > > >> > > >> > > >> > On Wed, Jul 9, 2025 at 10:50 PM Lincoln Lee <lincoln.8...@gmail.com> > >> > wrote: > >> > > >> > > Hi Honggeun, > >> > > > >> > > It looks like the confusion stems from a documentation issue. > >> > > > >> > > As the code comment explains: > >> > > > >> > > * @param timeout from first invoke to final completion of > asynchronous > >> > > operation, may include > >> > > * multiple retries, and will be reset in case of restart > >> > > > >> > > This means the timeout covers the entire duration of the async > >> operation, > >> > > including any retries. You could try increasing the timeout value to > >> > ensure > >> > > it accommodates the maximum expected duration, including retries. > >> > > > >> > > This detail should definitely be made clearer in the documentation: > >> > > > >> > > > >> > > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api > >> > > > >> > > Will address it. > >> > > > >> > > Best, > >> > > Lincoln Lee > >> > > > >> > > > >> > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道: > >> > > > >> > > > Dear Flink Community, > >> > > > > >> > > > I am using AsyncDataStream.unorderedWaitWithRetry together with a > >> > > > RichAsyncFunction to perform asynchronous processing in Flink > 1.19. > >> I > >> > > have > >> > > > configured a retry strategy using FixedDelayRetryStrategy with a > >> > maximum > >> > > of > >> > > > 3 retries and a delay of 1000 milliseconds. > >> > > > > >> > > > *Issue:* > >> > > > When the initial call to asyncInvoke() times out, the overridden > >> > > timeout() > >> > > > method is invoked as expected. However, when a retry is triggered > >> > (either > >> > > > due to a timeout or another exception), and that retry also > >> > experiences a > >> > > > timeout, the timeout() method is not called again. It seems that > the > >> > > > timeout is not re-registered for retries, which causes those retry > >> > > attempts > >> > > > to hang indefinitely. > >> > > > > >> > > > *Observed Output:* > >> > > > > >> > > > CustomSend AsyncIO start...!Trying document at 1752058980697 --> > >> > > > customDoc://korea.test.test/12345Timeout document at 1752058982687 > >> --> > >> > > > customDoc://korea.test.test/12345Trying document at 1752058983701 > >> --> > >> > > > customDoc://korea.test.test/12345// No further timeout messages > >> appear > >> > > > > >> > > > *Expected Output:* > >> > > > Each retry should also be subject to timeout handling, and the > >> > timeout() > >> > > > method should be called if the retry does not complete within the > >> > timeout > >> > > > period. For example: > >> > > > > >> > > > Trying document...Timeout...Trying document again...Timeout... > >> > > > (repeated up to max retries) > >> > > > > >> > > > *Test Configuration Summary:* > >> > > > > >> > > > - > >> > > > > >> > > > In asyncInvoke(), I simulate long processing using > >> > > Thread.sleep(150000) > >> > > > inside a CompletableFuture to force timeouts. > >> > > > - > >> > > > > >> > > > The timeout() method calls > >> resultFuture.completeExceptionally(...). > >> > > > - > >> > > > > >> > > > unorderedWaitWithRetry(...) configuration: > >> > > > - > >> > > > > >> > > > Timeout: 1000 ms > >> > > > - > >> > > > > >> > > > Retry Strategy: Fixed delay, 3 retries, 1000 ms delay > >> > > > - > >> > > > > >> > > > Parallelism: 1 > >> > > > > >> > > > *Workaround / Potential Fix:* > >> > > > I tried modifying the AsyncWaitOperator#tryOnce(...) method to > >> manually > >> > > > call resultHandlerDelegator.registerTimeout(timeout) before each > >> retry > >> > > > attempt. This seems to resolve the issue and ensures that all > >> retries > >> > > > respect the timeout. > >> > > > > >> > > > private void tryOnce(RetryableResultHandlerDelegator > >> > > > resultHandlerDelegator) throws Exception { > >> > > > resultHandlerDelegator.currentAttempts++; > >> > > > resultHandlerDelegator.registerTimeout(timeout); // Fix? > >> > > > userFunction.asyncInvoke( > >> > > > > resultHandlerDelegator.resultHandler.inputRecord.getValue(), > >> > > > resultHandlerDelegator); > >> > > > } > >> > > > > >> > > > *My Questions:* > >> > > > > >> > > > - > >> > > > > >> > > > Is this a known issue? > >> > > > - > >> > > > > >> > > > Is it valid to call registerTimeout() inside tryOnce()? > >> > > > - > >> > > > > >> > > > Could this introduce any side effects, especially when called > via > >> > > > finishInFlightDelayedRetry()? > >> > > > > >> > > > If this is confirmed to be a bug, I am happy to create a JIRA > ticket > >> > and > >> > > > submit a fix. > >> > > > > >> > > > Thank you for your time and support. > >> > > > > >> > > > Best regards, > >> > > > Honggeun Ji > >> > > > > >> > > > >> > > >> > > >