Hi Ken, Thanks for the suggestion. Definitely a good call to just wrap the retry inside the client code. I'll give it a try. Besides that, Do you know if the async timeout is actually a global timeout? meaning it accounts for the time of each attempt call plus any interval time in between. I increase the async timeout and reduce the client timeout and it seems to help. But I will continue to monitor.
Leon On Tue, Sep 5, 2023 at 12:42 PM Ken Krugler <kkrugler_li...@transpac.com> wrote: > Hi Leon, > > Normally I try to handle retrying in the client being used to call the > server, as you have more control/context. > > If that’s not an option for you, then normally (un)orderedWaitWithRetry() > should work - when you say “it doesn’t seem to help much”, are you saying > that even with retry you get transient failures that you want to handle > better? > > If so, then you could implement the timeout() method in your > AsyncFunction, and complete with a special result that indicates you > exceeded the retry count. This would then avoid having the job restart. > > — Ken > > PS - note that you can also do something similar inside of the > asyncInvoke() method of your AsyncFunction, e.g: > > @Override > > *public* *void* asyncInvoke(String request, > ResultFuture<ServerRequestResult> resultFuture) *throws* Exception { > > > > *final* ServerResult timeoutResult = makeErrorResult(blah, > "Timeout"); > > > > // Use your own executor, so that you're not relying on the size > of the common fork pool. > > CompletableFuture.<ServerResult>*supplyAsync*(*new* > Supplier<ServerResult>() { > > > @Override > > *public* ServerResult get() { > > *try* { > > *return* client.request(request); > > } *catch* (Exception e) { > > *LOGGER*.debug("Exception requesting " + request, e); > > *return* makeErrorResult(blah, e.getMessage()); > > } > > } > > }, executor) > > .completeOnTimeout(timeoutResult, *REQUEST_TIMEOUT_SECONDS*, > TimeUnit.*SECONDS*) > > .thenAccept((ServerResult result) -> { > > ServerRequestResult requestResult = *new* > ServerRequestResult(); > > requestResult.setBlah(); > > resultFuture.complete(Collections.*singleton*(fetchResult)); > > }); > > } > > > On Sep 5, 2023, at 12:16 PM, Leon Xu <l...@attentivemobile.com> wrote: > > Hi Flink users, > > We are using Flink AsyncIO to call a grpc-based service in our Flink job. > And from time to time we are experiencing Async function timeout issues, > here's the exception. > ``` > java.lang.Exception: Could not complete the stream element: Record @ > 1693939169999 : [B@cadc5b3. > Caused by: java.util.concurrent.TimeoutException: Async function call has > timed out. > ``` > Every timeout will cause the job to restart, which seems to be very > expensive. > > On the server side it looks like these timeouts are transient and we were > expecting a retry will fix the issue. > We tried using the asyncIO retry strategy but it doesn't seem to help much. > `AsyncDataStream.orderedWaitWithRetry` > > Do you have any suggestions on how to better reduce these timeout errors? > > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > >