Hi, Leon > Besides that, Do you know if the async timeout is actually a global timeout? meaning it accounts for the time of each attempt call plus any interval time in between.
Yes, the timeout is total timeout, you can see [1][2] for more detail. [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963 [2] https://github.com/apache/flink/blob/7d8f9821d2b3ed9876eae4ffe2e3c8b86af2d88a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java#L209 Best, Ron Leon Xu <l...@attentivemobile.com> 于2023年9月6日周三 12:07写道: > Hi Ken, > > Thanks for the suggestion. Definitely a good call to just wrap the retry > inside the client code. I'll give it a try. > Besides that, Do you know if the async timeout is actually a global > timeout? meaning it accounts for the time of each attempt call plus any > interval time in between. > I increase the async timeout and reduce the client timeout and it seems to > help. But I will continue to monitor. > > Leon > > On Tue, Sep 5, 2023 at 12:42 PM Ken Krugler <kkrugler_li...@transpac.com> > wrote: > >> Hi Leon, >> >> Normally I try to handle retrying in the client being used to call the >> server, as you have more control/context. >> >> If that’s not an option for you, then normally (un)orderedWaitWithRetry() >> should work - when you say “it doesn’t seem to help much”, are you saying >> that even with retry you get transient failures that you want to handle >> better? >> >> If so, then you could implement the timeout() method in your >> AsyncFunction, and complete with a special result that indicates you >> exceeded the retry count. This would then avoid having the job restart. >> >> — Ken >> >> PS - note that you can also do something similar inside of the >> asyncInvoke() method of your AsyncFunction, e.g: >> >> @Override >> >> *public* *void* asyncInvoke(String request, >> ResultFuture<ServerRequestResult> resultFuture) *throws* Exception { >> >> >> >> *final* ServerResult timeoutResult = makeErrorResult(blah, >> "Timeout"); >> >> >> >> // Use your own executor, so that you're not relying on the size >> of the common fork pool. >> >> CompletableFuture.<ServerResult>*supplyAsync*(*new* >> Supplier<ServerResult>() { >> >> >> @Override >> >> *public* ServerResult get() { >> >> *try* { >> >> *return* client.request(request); >> >> } *catch* (Exception e) { >> >> *LOGGER*.debug("Exception requesting " + request, e); >> >> *return* makeErrorResult(blah, e.getMessage()); >> >> } >> >> } >> >> }, executor) >> >> .completeOnTimeout(timeoutResult, *REQUEST_TIMEOUT_SECONDS*, >> TimeUnit.*SECONDS*) >> >> .thenAccept((ServerResult result) -> { >> >> ServerRequestResult requestResult = *new* >> ServerRequestResult(); >> >> requestResult.setBlah(); >> >> resultFuture.complete(Collections.*singleton*(fetchResult)); >> >> }); >> >> } >> >> >> On Sep 5, 2023, at 12:16 PM, Leon Xu <l...@attentivemobile.com> wrote: >> >> Hi Flink users, >> >> We are using Flink AsyncIO to call a grpc-based service in our Flink job. >> And from time to time we are experiencing Async function timeout issues, >> here's the exception. >> ``` >> java.lang.Exception: Could not complete the stream element: Record @ >> 1693939169999 : [B@cadc5b3. >> Caused by: java.util.concurrent.TimeoutException: Async function call >> has timed out. >> ``` >> Every timeout will cause the job to restart, which seems to be very >> expensive. >> >> On the server side it looks like these timeouts are transient and we were >> expecting a retry will fix the issue. >> We tried using the asyncIO retry strategy but it doesn't seem to help >> much. >> `AsyncDataStream.orderedWaitWithRetry` >> >> Do you have any suggestions on how to better reduce these timeout errors? >> >> >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com >> Custom big data solutions >> Flink, Pinot, Solr, Elasticsearch >> >> >> >>