Hi Ken,

Thanks for the suggestion. Definitely a good call to just wrap the retry
inside the client code. I'll give it a try.
Besides that, Do you know if the async timeout is actually a global
timeout? meaning it accounts for the time of each attempt call plus any
interval time in between.
I increase the async timeout and reduce the client timeout and it seems to
help. But I will continue to monitor.

Leon

On Tue, Sep 5, 2023 at 12:42 PM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Leon,
>
> Normally I try to handle retrying in the client being used to call the
> server, as you have more control/context.
>
> If that’s not an option for you, then normally (un)orderedWaitWithRetry()
> should work - when you say “it doesn’t seem to help much”, are you saying
> that even with retry you get transient failures that you want to handle
> better?
>
> If so, then you could implement the timeout() method in your
> AsyncFunction, and complete with a special result that indicates you
> exceeded the retry count. This would then avoid having the job restart.
>
> — Ken
>
> PS - note that you can also do something similar inside of the
> asyncInvoke() method of your AsyncFunction, e.g:
>
>     @Override
>
>     *public* *void* asyncInvoke(String request,
> ResultFuture<ServerRequestResult> resultFuture) *throws* Exception {
>
>
>
>         *final* ServerResult timeoutResult = makeErrorResult(blah,
> "Timeout");
>
>
>
>         // Use your own executor, so that you're not relying on the size
> of the common fork pool.
>
>         CompletableFuture.<ServerResult>*supplyAsync*(*new*
> Supplier<ServerResult>() {
>
>
>             @Override
>
>             *public* ServerResult get() {
>
>                 *try* {
>
>                     *return* client.request(request);
>
>                 } *catch* (Exception e) {
>
>                     *LOGGER*.debug("Exception requesting " + request, e);
>
>                     *return* makeErrorResult(blah, e.getMessage());
>
>                 }
>
>             }
>
>         }, executor)
>
>         .completeOnTimeout(timeoutResult, *REQUEST_TIMEOUT_SECONDS*,
> TimeUnit.*SECONDS*)
>
>         .thenAccept((ServerResult result) -> {
>
>             ServerRequestResult requestResult = *new*
>  ServerRequestResult();
>
>             requestResult.setBlah();
>
>             resultFuture.complete(Collections.*singleton*(fetchResult));
>
>         });
>
>     }
>
>
> On Sep 5, 2023, at 12:16 PM, Leon Xu <l...@attentivemobile.com> wrote:
>
> Hi Flink users,
>
> We are using Flink AsyncIO to call a grpc-based service in our Flink job.
> And from time to time we are experiencing Async function timeout issues,
> here's the exception.
> ```
> java.lang.Exception: Could not complete the stream element: Record @
> 1693939169999 : [B@cadc5b3.
> Caused by: java.util.concurrent.TimeoutException: Async function call has
> timed out.
> ```
> Every timeout will cause the job to restart, which seems to be very
> expensive.
>
> On the server side it looks like these timeouts are transient and we were
> expecting a retry will fix the issue.
> We tried using the asyncIO retry strategy but it doesn't seem to help much.
> `AsyncDataStream.orderedWaitWithRetry`
>
> Do you have any suggestions on how to better reduce these timeout errors?
>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

Reply via email to