We did consider using a combination of per-attempt timeouts and an overall
timeout. However, we ultimately decided to go with just a single total
timeout.
The main reasons were:

1. We wanted to keep the API simpler for users — introducing both a
per-attempt
timeout and a total timeout would require exposing two parameters, or
keeping
only the per-attempt one, in which case the actual total retry duration
would
depend on the retry strategy and be harder to reason about.

2. In most cases where a timeout occurs, it’s often because the external
service
 is experiencing issues. In such situations, retrying is unlikely to yield
a
 different result, so having a global timeout is often sufficient.

Supporting separate retry-related timeout configurations would require
changes
to the public API, and such changes would need to go through the FLIP
process
(see [1]).

In your case, if the async operation is expected to eventually succeed, you
can
consider increasing both the retry count and the total timeout.
If you prefer to return a default value after retries are exhausted, you
can
override the `timeout` method and call `complete(your_default_value)`
instead of
`completeExceptionally`.

1.
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals


Best,
Lincoln Lee


HONGGEUN JI <gihon...@gmail.com> 于2025年7月11日周五 13:53写道:

> I have one last question.
>
> As I understand, the current timeout covers the total duration, including
> all retries.
> However, I’m wondering if there have been any discussions or plans to
> support a timeout that only applies to a *single* try (not the total time
> across retries).
>
> This might just be a special case for my use, but I was curious—
> is there a way to set a timeout for just one attempt, so that
> AsyncWaitOperator can retry without needing to restart the entire job?
>
> Thanks again for your support.
>
>
>
>
> On Fri, Jul 11, 2025 at 10:22 AM HONGGEUN JI <gihon...@gmail.com> wrote:
>
> > Hi Lincoln,
> > Thank you for looking into this issue.
> >
> > I appreciate your effort, and I’d be happy to review your PR.
> >
> >
> >
> >
> > On Thu, Jul 10, 2025 at 10:38 PM Lincoln Lee <lincoln.8...@gmail.com>
> > wrote:
> >
> >> Good question! The current implementation has the unexpected extra
> retries
> >> in your case.
> >>
> >> After some thought — we can actually avoid unnecessary retries here
> >> without
> >> changing the current behavior where the timeout covers the total
> duration.
> >>
> >> To add some context, this was indeed a missing piece in the original
> >> design.
> >> Back when we were first discussing the FLIP[1], we had considered
> tracking
> >> execution
> >> time as part of the state. However, for various reasons, we ultimately
> >> went
> >> with
> >> the simpler stateless approach (which may lead to additional retries
> from
> >> the
> >> user's perspective after a job restart). You’ve hit on the two key
> issues
> >> we
> >> debated back then, this clearly shows there's still room for
> improvement.
> >>
> >> I've created a PR https://github.com/apache/flink/pull/26779
> >> Could you help review it? We can continue the discussion there.
> >>
> >> 1. https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g
> >>
> >>
> >> Best,
> >> Lincoln Lee
> >>
> >>
> >> HONGGEUN JI <gihon...@gmail.com> 于2025年7月10日周四 09:29写道:
> >>
> >> > Thank you for the clarification. I now understand that the timeout()
> >> method
> >> > is intended to cover all async I/O attempts, including retries.
> >> Apologies
> >> > for the confusion in my previous message.
> >> >
> >> > If you don’t mind, I’d like to ask one more question to make sure I
> >> fully
> >> > understand the behavior.
> >> >
> >> > Let’s say a timeout occurs during any async operation (either the
> >> initial
> >> > call or during the retry). Inside my overridden timeout() method, I
> call
> >> > resultFuture.completeExceptionally(...)
> >> >
> >> > From what I can see in the code, after that completeExceptionally(),
> the
> >> > RetryableResultHandlerDelegator#processRetry method is eventually
> >> invoked.
> >> >
> >> > Inside processRetry, the retry can happens if the following condition
> is
> >> > true:
> >> >
> >> >      if (satisfy && asyncRetryStrategy.canRetry(currentAttempts) &&
> >> > !retryDisabledOnFinish.get())
> >> >
> >> > So my question is:
> >> > If completeExceptionally(...) is called inside the timeout() method,
> and
> >> > the retry conditions are met, will the retry still be executed even
> >> after
> >> > the timeout? Or is the timeout supposed to be the final failure point?
> >> >
> >> > I want to make sure I’m not misunderstanding how timeout() interacts
> >> with
> >> > the retry mechanism.
> >> >
> >> > Thank you again for your help!
> >> >
> >> > Best regards,
> >> > Honggeun Ji
> >> >
> >> >
> >> >
> >> >
> >> > On Wed, Jul 9, 2025 at 10:50 PM Lincoln Lee <lincoln.8...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Honggeun,
> >> > >
> >> > > It looks like the confusion stems from a documentation issue.
> >> > >
> >> > > As the code comment explains:
> >> > >
> >> > > * @param timeout from first invoke to final completion of
> asynchronous
> >> > > operation, may include
> >> > > *     multiple retries, and will be reset in case of restart
> >> > >
> >> > > This means the timeout covers the entire duration of the async
> >> operation,
> >> > > including any retries. You could try increasing the timeout value to
> >> > ensure
> >> > > it accommodates the maximum expected duration, including retries.
> >> > >
> >> > > This detail should definitely be made clearer in the documentation:
> >> > >
> >> > >
> >> >
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
> >> > >
> >> > > Will address it.
> >> > >
> >> > > Best,
> >> > > Lincoln Lee
> >> > >
> >> > >
> >> > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道:
> >> > >
> >> > > > Dear Flink Community,
> >> > > >
> >> > > > I am using AsyncDataStream.unorderedWaitWithRetry together with a
> >> > > > RichAsyncFunction to perform asynchronous processing in Flink
> 1.19.
> >> I
> >> > > have
> >> > > > configured a retry strategy using FixedDelayRetryStrategy with a
> >> > maximum
> >> > > of
> >> > > > 3 retries and a delay of 1000 milliseconds.
> >> > > >
> >> > > > *Issue:*
> >> > > > When the initial call to asyncInvoke() times out, the overridden
> >> > > timeout()
> >> > > > method is invoked as expected. However, when a retry is triggered
> >> > (either
> >> > > > due to a timeout or another exception), and that retry also
> >> > experiences a
> >> > > > timeout, the timeout() method is not called again. It seems that
> the
> >> > > > timeout is not re-registered for retries, which causes those retry
> >> > > attempts
> >> > > > to hang indefinitely.
> >> > > >
> >> > > > *Observed Output:*
> >> > > >
> >> > > > CustomSend AsyncIO start...!Trying document at 1752058980697 -->
> >> > > > customDoc://korea.test.test/12345Timeout document at 1752058982687
> >> -->
> >> > > > customDoc://korea.test.test/12345Trying document at 1752058983701
> >> -->
> >> > > > customDoc://korea.test.test/12345// No further timeout messages
> >> appear
> >> > > >
> >> > > > *Expected Output:*
> >> > > > Each retry should also be subject to timeout handling, and the
> >> > timeout()
> >> > > > method should be called if the retry does not complete within the
> >> > timeout
> >> > > > period. For example:
> >> > > >
> >> > > > Trying document...Timeout...Trying document again...Timeout...
> >> > > > (repeated up to max retries)
> >> > > >
> >> > > > *Test Configuration Summary:*
> >> > > >
> >> > > >    -
> >> > > >
> >> > > >    In asyncInvoke(), I simulate long processing using
> >> > > Thread.sleep(150000)
> >> > > >    inside a CompletableFuture to force timeouts.
> >> > > >    -
> >> > > >
> >> > > >    The timeout() method calls
> >> resultFuture.completeExceptionally(...).
> >> > > >    -
> >> > > >
> >> > > >    unorderedWaitWithRetry(...) configuration:
> >> > > >    -
> >> > > >
> >> > > >       Timeout: 1000 ms
> >> > > >       -
> >> > > >
> >> > > >       Retry Strategy: Fixed delay, 3 retries, 1000 ms delay
> >> > > >       -
> >> > > >
> >> > > >       Parallelism: 1
> >> > > >
> >> > > > *Workaround / Potential Fix:*
> >> > > > I tried modifying the AsyncWaitOperator#tryOnce(...) method to
> >> manually
> >> > > > call resultHandlerDelegator.registerTimeout(timeout) before each
> >> retry
> >> > > > attempt. This seems to resolve the issue and ensures that all
> >> retries
> >> > > > respect the timeout.
> >> > > >
> >> > > > private void tryOnce(RetryableResultHandlerDelegator
> >> > > > resultHandlerDelegator) throws Exception {
> >> > > >     resultHandlerDelegator.currentAttempts++;
> >> > > >     resultHandlerDelegator.registerTimeout(timeout); // Fix?
> >> > > >     userFunction.asyncInvoke(
> >> > > >
>  resultHandlerDelegator.resultHandler.inputRecord.getValue(),
> >> > > >         resultHandlerDelegator);
> >> > > > }
> >> > > >
> >> > > > *My Questions:*
> >> > > >
> >> > > >    -
> >> > > >
> >> > > >    Is this a known issue?
> >> > > >    -
> >> > > >
> >> > > >    Is it valid to call registerTimeout() inside tryOnce()?
> >> > > >    -
> >> > > >
> >> > > >    Could this introduce any side effects, especially when called
> via
> >> > > >    finishInFlightDelayedRetry()?
> >> > > >
> >> > > > If this is confirmed to be a bug, I am happy to create a JIRA
> ticket
> >> > and
> >> > > > submit a fix.
> >> > > >
> >> > > > Thank you for your time and support.
> >> > > >
> >> > > > Best regards,
> >> > > > Honggeun Ji
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to