Hi Lincoln,
Thank you for looking into this issue.

I appreciate your effort, and I’d be happy to review your PR.




On Thu, Jul 10, 2025 at 10:38 PM Lincoln Lee <lincoln.8...@gmail.com> wrote:

> Good question! The current implementation has the unexpected extra retries
> in your case.
>
> After some thought — we can actually avoid unnecessary retries here without
> changing the current behavior where the timeout covers the total duration.
>
> To add some context, this was indeed a missing piece in the original
> design.
> Back when we were first discussing the FLIP[1], we had considered tracking
> execution
> time as part of the state. However, for various reasons, we ultimately went
> with
> the simpler stateless approach (which may lead to additional retries from
> the
> user's perspective after a job restart). You’ve hit on the two key issues
> we
> debated back then, this clearly shows there's still room for improvement.
>
> I've created a PR https://github.com/apache/flink/pull/26779
> Could you help review it? We can continue the discussion there.
>
> 1. https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g
>
>
> Best,
> Lincoln Lee
>
>
> HONGGEUN JI <gihon...@gmail.com> 于2025年7月10日周四 09:29写道:
>
> > Thank you for the clarification. I now understand that the timeout()
> method
> > is intended to cover all async I/O attempts, including retries. Apologies
> > for the confusion in my previous message.
> >
> > If you don’t mind, I’d like to ask one more question to make sure I fully
> > understand the behavior.
> >
> > Let’s say a timeout occurs during any async operation (either the initial
> > call or during the retry). Inside my overridden timeout() method, I call
> > resultFuture.completeExceptionally(...)
> >
> > From what I can see in the code, after that completeExceptionally(), the
> > RetryableResultHandlerDelegator#processRetry method is eventually
> invoked.
> >
> > Inside processRetry, the retry can happens if the following condition is
> > true:
> >
> >      if (satisfy && asyncRetryStrategy.canRetry(currentAttempts) &&
> > !retryDisabledOnFinish.get())
> >
> > So my question is:
> > If completeExceptionally(...) is called inside the timeout() method, and
> > the retry conditions are met, will the retry still be executed even after
> > the timeout? Or is the timeout supposed to be the final failure point?
> >
> > I want to make sure I’m not misunderstanding how timeout() interacts with
> > the retry mechanism.
> >
> > Thank you again for your help!
> >
> > Best regards,
> > Honggeun Ji
> >
> >
> >
> >
> > On Wed, Jul 9, 2025 at 10:50 PM Lincoln Lee <lincoln.8...@gmail.com>
> > wrote:
> >
> > > Hi Honggeun,
> > >
> > > It looks like the confusion stems from a documentation issue.
> > >
> > > As the code comment explains:
> > >
> > > * @param timeout from first invoke to final completion of asynchronous
> > > operation, may include
> > > *     multiple retries, and will be reset in case of restart
> > >
> > > This means the timeout covers the entire duration of the async
> operation,
> > > including any retries. You could try increasing the timeout value to
> > ensure
> > > it accommodates the maximum expected duration, including retries.
> > >
> > > This detail should definitely be made clearer in the documentation:
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
> > >
> > > Will address it.
> > >
> > > Best,
> > > Lincoln Lee
> > >
> > >
> > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道:
> > >
> > > > Dear Flink Community,
> > > >
> > > > I am using AsyncDataStream.unorderedWaitWithRetry together with a
> > > > RichAsyncFunction to perform asynchronous processing in Flink 1.19. I
> > > have
> > > > configured a retry strategy using FixedDelayRetryStrategy with a
> > maximum
> > > of
> > > > 3 retries and a delay of 1000 milliseconds.
> > > >
> > > > *Issue:*
> > > > When the initial call to asyncInvoke() times out, the overridden
> > > timeout()
> > > > method is invoked as expected. However, when a retry is triggered
> > (either
> > > > due to a timeout or another exception), and that retry also
> > experiences a
> > > > timeout, the timeout() method is not called again. It seems that the
> > > > timeout is not re-registered for retries, which causes those retry
> > > attempts
> > > > to hang indefinitely.
> > > >
> > > > *Observed Output:*
> > > >
> > > > CustomSend AsyncIO start...!Trying document at 1752058980697 -->
> > > > customDoc://korea.test.test/12345Timeout document at 1752058982687
> -->
> > > > customDoc://korea.test.test/12345Trying document at 1752058983701 -->
> > > > customDoc://korea.test.test/12345// No further timeout messages
> appear
> > > >
> > > > *Expected Output:*
> > > > Each retry should also be subject to timeout handling, and the
> > timeout()
> > > > method should be called if the retry does not complete within the
> > timeout
> > > > period. For example:
> > > >
> > > > Trying document...Timeout...Trying document again...Timeout...
> > > > (repeated up to max retries)
> > > >
> > > > *Test Configuration Summary:*
> > > >
> > > >    -
> > > >
> > > >    In asyncInvoke(), I simulate long processing using
> > > Thread.sleep(150000)
> > > >    inside a CompletableFuture to force timeouts.
> > > >    -
> > > >
> > > >    The timeout() method calls
> resultFuture.completeExceptionally(...).
> > > >    -
> > > >
> > > >    unorderedWaitWithRetry(...) configuration:
> > > >    -
> > > >
> > > >       Timeout: 1000 ms
> > > >       -
> > > >
> > > >       Retry Strategy: Fixed delay, 3 retries, 1000 ms delay
> > > >       -
> > > >
> > > >       Parallelism: 1
> > > >
> > > > *Workaround / Potential Fix:*
> > > > I tried modifying the AsyncWaitOperator#tryOnce(...) method to
> manually
> > > > call resultHandlerDelegator.registerTimeout(timeout) before each
> retry
> > > > attempt. This seems to resolve the issue and ensures that all retries
> > > > respect the timeout.
> > > >
> > > > private void tryOnce(RetryableResultHandlerDelegator
> > > > resultHandlerDelegator) throws Exception {
> > > >     resultHandlerDelegator.currentAttempts++;
> > > >     resultHandlerDelegator.registerTimeout(timeout); // Fix?
> > > >     userFunction.asyncInvoke(
> > > >         resultHandlerDelegator.resultHandler.inputRecord.getValue(),
> > > >         resultHandlerDelegator);
> > > > }
> > > >
> > > > *My Questions:*
> > > >
> > > >    -
> > > >
> > > >    Is this a known issue?
> > > >    -
> > > >
> > > >    Is it valid to call registerTimeout() inside tryOnce()?
> > > >    -
> > > >
> > > >    Could this introduce any side effects, especially when called via
> > > >    finishInFlightDelayedRetry()?
> > > >
> > > > If this is confirmed to be a bug, I am happy to create a JIRA ticket
> > and
> > > > submit a fix.
> > > >
> > > > Thank you for your time and support.
> > > >
> > > > Best regards,
> > > > Honggeun Ji
> > > >
> > >
> >
>

Reply via email to