I have one last question.

As I understand, the current timeout covers the total duration, including
all retries.
However, I’m wondering if there have been any discussions or plans to
support a timeout that only applies to a *single* try (not the total time
across retries).

This might just be a special case for my use, but I was curious—
is there a way to set a timeout for just one attempt, so that
AsyncWaitOperator can retry without needing to restart the entire job?

Thanks again for your support.




On Fri, Jul 11, 2025 at 10:22 AM HONGGEUN JI <gihon...@gmail.com> wrote:

> Hi Lincoln,
> Thank you for looking into this issue.
>
> I appreciate your effort, and I’d be happy to review your PR.
>
>
>
>
> On Thu, Jul 10, 2025 at 10:38 PM Lincoln Lee <lincoln.8...@gmail.com>
> wrote:
>
>> Good question! The current implementation has the unexpected extra retries
>> in your case.
>>
>> After some thought — we can actually avoid unnecessary retries here
>> without
>> changing the current behavior where the timeout covers the total duration.
>>
>> To add some context, this was indeed a missing piece in the original
>> design.
>> Back when we were first discussing the FLIP[1], we had considered tracking
>> execution
>> time as part of the state. However, for various reasons, we ultimately
>> went
>> with
>> the simpler stateless approach (which may lead to additional retries from
>> the
>> user's perspective after a job restart). You’ve hit on the two key issues
>> we
>> debated back then, this clearly shows there's still room for improvement.
>>
>> I've created a PR https://github.com/apache/flink/pull/26779
>> Could you help review it? We can continue the discussion there.
>>
>> 1. https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g
>>
>>
>> Best,
>> Lincoln Lee
>>
>>
>> HONGGEUN JI <gihon...@gmail.com> 于2025年7月10日周四 09:29写道:
>>
>> > Thank you for the clarification. I now understand that the timeout()
>> method
>> > is intended to cover all async I/O attempts, including retries.
>> Apologies
>> > for the confusion in my previous message.
>> >
>> > If you don’t mind, I’d like to ask one more question to make sure I
>> fully
>> > understand the behavior.
>> >
>> > Let’s say a timeout occurs during any async operation (either the
>> initial
>> > call or during the retry). Inside my overridden timeout() method, I call
>> > resultFuture.completeExceptionally(...)
>> >
>> > From what I can see in the code, after that completeExceptionally(), the
>> > RetryableResultHandlerDelegator#processRetry method is eventually
>> invoked.
>> >
>> > Inside processRetry, the retry can happens if the following condition is
>> > true:
>> >
>> >      if (satisfy && asyncRetryStrategy.canRetry(currentAttempts) &&
>> > !retryDisabledOnFinish.get())
>> >
>> > So my question is:
>> > If completeExceptionally(...) is called inside the timeout() method, and
>> > the retry conditions are met, will the retry still be executed even
>> after
>> > the timeout? Or is the timeout supposed to be the final failure point?
>> >
>> > I want to make sure I’m not misunderstanding how timeout() interacts
>> with
>> > the retry mechanism.
>> >
>> > Thank you again for your help!
>> >
>> > Best regards,
>> > Honggeun Ji
>> >
>> >
>> >
>> >
>> > On Wed, Jul 9, 2025 at 10:50 PM Lincoln Lee <lincoln.8...@gmail.com>
>> > wrote:
>> >
>> > > Hi Honggeun,
>> > >
>> > > It looks like the confusion stems from a documentation issue.
>> > >
>> > > As the code comment explains:
>> > >
>> > > * @param timeout from first invoke to final completion of asynchronous
>> > > operation, may include
>> > > *     multiple retries, and will be reset in case of restart
>> > >
>> > > This means the timeout covers the entire duration of the async
>> operation,
>> > > including any retries. You could try increasing the timeout value to
>> > ensure
>> > > it accommodates the maximum expected duration, including retries.
>> > >
>> > > This detail should definitely be made clearer in the documentation:
>> > >
>> > >
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api
>> > >
>> > > Will address it.
>> > >
>> > > Best,
>> > > Lincoln Lee
>> > >
>> > >
>> > > HONGGEUN JI <gihon...@gmail.com> 于2025年7月9日周三 19:25写道:
>> > >
>> > > > Dear Flink Community,
>> > > >
>> > > > I am using AsyncDataStream.unorderedWaitWithRetry together with a
>> > > > RichAsyncFunction to perform asynchronous processing in Flink 1.19.
>> I
>> > > have
>> > > > configured a retry strategy using FixedDelayRetryStrategy with a
>> > maximum
>> > > of
>> > > > 3 retries and a delay of 1000 milliseconds.
>> > > >
>> > > > *Issue:*
>> > > > When the initial call to asyncInvoke() times out, the overridden
>> > > timeout()
>> > > > method is invoked as expected. However, when a retry is triggered
>> > (either
>> > > > due to a timeout or another exception), and that retry also
>> > experiences a
>> > > > timeout, the timeout() method is not called again. It seems that the
>> > > > timeout is not re-registered for retries, which causes those retry
>> > > attempts
>> > > > to hang indefinitely.
>> > > >
>> > > > *Observed Output:*
>> > > >
>> > > > CustomSend AsyncIO start...!Trying document at 1752058980697 -->
>> > > > customDoc://korea.test.test/12345Timeout document at 1752058982687
>> -->
>> > > > customDoc://korea.test.test/12345Trying document at 1752058983701
>> -->
>> > > > customDoc://korea.test.test/12345// No further timeout messages
>> appear
>> > > >
>> > > > *Expected Output:*
>> > > > Each retry should also be subject to timeout handling, and the
>> > timeout()
>> > > > method should be called if the retry does not complete within the
>> > timeout
>> > > > period. For example:
>> > > >
>> > > > Trying document...Timeout...Trying document again...Timeout...
>> > > > (repeated up to max retries)
>> > > >
>> > > > *Test Configuration Summary:*
>> > > >
>> > > >    -
>> > > >
>> > > >    In asyncInvoke(), I simulate long processing using
>> > > Thread.sleep(150000)
>> > > >    inside a CompletableFuture to force timeouts.
>> > > >    -
>> > > >
>> > > >    The timeout() method calls
>> resultFuture.completeExceptionally(...).
>> > > >    -
>> > > >
>> > > >    unorderedWaitWithRetry(...) configuration:
>> > > >    -
>> > > >
>> > > >       Timeout: 1000 ms
>> > > >       -
>> > > >
>> > > >       Retry Strategy: Fixed delay, 3 retries, 1000 ms delay
>> > > >       -
>> > > >
>> > > >       Parallelism: 1
>> > > >
>> > > > *Workaround / Potential Fix:*
>> > > > I tried modifying the AsyncWaitOperator#tryOnce(...) method to
>> manually
>> > > > call resultHandlerDelegator.registerTimeout(timeout) before each
>> retry
>> > > > attempt. This seems to resolve the issue and ensures that all
>> retries
>> > > > respect the timeout.
>> > > >
>> > > > private void tryOnce(RetryableResultHandlerDelegator
>> > > > resultHandlerDelegator) throws Exception {
>> > > >     resultHandlerDelegator.currentAttempts++;
>> > > >     resultHandlerDelegator.registerTimeout(timeout); // Fix?
>> > > >     userFunction.asyncInvoke(
>> > > >         resultHandlerDelegator.resultHandler.inputRecord.getValue(),
>> > > >         resultHandlerDelegator);
>> > > > }
>> > > >
>> > > > *My Questions:*
>> > > >
>> > > >    -
>> > > >
>> > > >    Is this a known issue?
>> > > >    -
>> > > >
>> > > >    Is it valid to call registerTimeout() inside tryOnce()?
>> > > >    -
>> > > >
>> > > >    Could this introduce any side effects, especially when called via
>> > > >    finishInFlightDelayedRetry()?
>> > > >
>> > > > If this is confirmed to be a bug, I am happy to create a JIRA ticket
>> > and
>> > > > submit a fix.
>> > > >
>> > > > Thank you for your time and support.
>> > > >
>> > > > Best regards,
>> > > > Honggeun Ji
>> > > >
>> > >
>> >
>>
>

Reply via email to