Hi Leon,

Normally I try to handle retrying in the client being used to call the server, 
as you have more control/context.

If that’s not an option for you, then normally (un)orderedWaitWithRetry() 
should work - when you say “it doesn’t seem to help much”, are you saying that 
even with retry you get transient failures that you want to handle better?

If so, then you could implement the timeout() method in your AsyncFunction, and 
complete with a special result that indicates you exceeded the retry count. 
This would then avoid having the job restart.

— Ken

PS - note that you can also do something similar inside of the asyncInvoke() 
method of your AsyncFunction, e.g:

    @Override
    public void asyncInvoke(String request, ResultFuture<ServerRequestResult> 
resultFuture) throws Exception {
        
        final ServerResult timeoutResult = makeErrorResult(blah, "Timeout");
        
        // Use your own executor, so that you're not relying on the size of the 
common fork pool.
        CompletableFuture.<ServerResult>supplyAsync(new 
Supplier<ServerResult>() {

            @Override
            public ServerResult get() {
                try {
                    return client.request(request);
                } catch (Exception e) {
                    LOGGER.debug("Exception requesting " + request, e);
                    return makeErrorResult(blah, e.getMessage());
                }
            }
        }, executor)
        .completeOnTimeout(timeoutResult, REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)
        .thenAccept((ServerResult result) -> {
            ServerRequestResult requestResult = new ServerRequestResult();
            requestResult.setBlah();
            resultFuture.complete(Collections.singleton(fetchResult));
        });
    }


> On Sep 5, 2023, at 12:16 PM, Leon Xu <l...@attentivemobile.com> wrote:
> 
> Hi Flink users,
> 
> We are using Flink AsyncIO to call a grpc-based service in our Flink job.
> And from time to time we are experiencing Async function timeout issues, 
> here's the exception.
> ```
> java.lang.Exception: Could not complete the stream element: Record @ 
> 1693939169999 : [B@cadc5b3.
> Caused by: java.util.concurrent.TimeoutException: Async function call has 
> timed out.
> ```
> Every timeout will cause the job to restart, which seems to be very expensive.
> 
> On the server side it looks like these timeouts are transient and we were 
> expecting a retry will fix the issue.
> We tried using the asyncIO retry strategy but it doesn't seem to help much.
> `AsyncDataStream.orderedWaitWithRetry`
> 
> Do you have any suggestions on how to better reduce these timeout errors?
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to