Hi, Leon

> Besides that, Do you know if the async timeout is actually a global
timeout? meaning it accounts for the time of each attempt call plus any
interval time in between.

Yes, the timeout is total timeout, you can see [1][2] for more detail.


[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963
[2]
https://github.com/apache/flink/blob/7d8f9821d2b3ed9876eae4ffe2e3c8b86af2d88a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java#L209

Best,
Ron

Leon Xu <l...@attentivemobile.com> 于2023年9月6日周三 12:07写道:

> Hi Ken,
>
> Thanks for the suggestion. Definitely a good call to just wrap the retry
> inside the client code. I'll give it a try.
> Besides that, Do you know if the async timeout is actually a global
> timeout? meaning it accounts for the time of each attempt call plus any
> interval time in between.
> I increase the async timeout and reduce the client timeout and it seems to
> help. But I will continue to monitor.
>
> Leon
>
> On Tue, Sep 5, 2023 at 12:42 PM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
>> Hi Leon,
>>
>> Normally I try to handle retrying in the client being used to call the
>> server, as you have more control/context.
>>
>> If that’s not an option for you, then normally (un)orderedWaitWithRetry()
>> should work - when you say “it doesn’t seem to help much”, are you saying
>> that even with retry you get transient failures that you want to handle
>> better?
>>
>> If so, then you could implement the timeout() method in your
>> AsyncFunction, and complete with a special result that indicates you
>> exceeded the retry count. This would then avoid having the job restart.
>>
>> — Ken
>>
>> PS - note that you can also do something similar inside of the
>> asyncInvoke() method of your AsyncFunction, e.g:
>>
>>     @Override
>>
>>     *public* *void* asyncInvoke(String request,
>> ResultFuture<ServerRequestResult> resultFuture) *throws* Exception {
>>
>>
>>
>>         *final* ServerResult timeoutResult = makeErrorResult(blah,
>> "Timeout");
>>
>>
>>
>>         // Use your own executor, so that you're not relying on the size
>> of the common fork pool.
>>
>>         CompletableFuture.<ServerResult>*supplyAsync*(*new*
>> Supplier<ServerResult>() {
>>
>>
>>             @Override
>>
>>             *public* ServerResult get() {
>>
>>                 *try* {
>>
>>                     *return* client.request(request);
>>
>>                 } *catch* (Exception e) {
>>
>>                     *LOGGER*.debug("Exception requesting " + request, e);
>>
>>                     *return* makeErrorResult(blah, e.getMessage());
>>
>>                 }
>>
>>             }
>>
>>         }, executor)
>>
>>         .completeOnTimeout(timeoutResult, *REQUEST_TIMEOUT_SECONDS*,
>> TimeUnit.*SECONDS*)
>>
>>         .thenAccept((ServerResult result) -> {
>>
>>             ServerRequestResult requestResult = *new*
>>  ServerRequestResult();
>>
>>             requestResult.setBlah();
>>
>>             resultFuture.complete(Collections.*singleton*(fetchResult));
>>
>>         });
>>
>>     }
>>
>>
>> On Sep 5, 2023, at 12:16 PM, Leon Xu <l...@attentivemobile.com> wrote:
>>
>> Hi Flink users,
>>
>> We are using Flink AsyncIO to call a grpc-based service in our Flink job.
>> And from time to time we are experiencing Async function timeout issues,
>> here's the exception.
>> ```
>> java.lang.Exception: Could not complete the stream element: Record @
>> 1693939169999 : [B@cadc5b3.
>> Caused by: java.util.concurrent.TimeoutException: Async function call
>> has timed out.
>> ```
>> Every timeout will cause the job to restart, which seems to be very
>> expensive.
>>
>> On the server side it looks like these timeouts are transient and we were
>> expecting a retry will fix the issue.
>> We tried using the asyncIO retry strategy but it doesn't seem to help
>> much.
>> `AsyncDataStream.orderedWaitWithRetry`
>>
>> Do you have any suggestions on how to better reduce these timeout errors?
>>
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>>
>>
>>
>>

Reply via email to