Hi Tom,

On Mon, Oct 4, 2021 at 10:42 PM tom yang <ensc...@gmail.com> wrote:

> Hello,
>
>
>
> I have a recently ran into an issue with RichAsyncFunction and wanted to
> get some guidance from the community
>
>
>
> Please see snippet
>
>
>
> *class* AsyncFetchFromHttp *extends* RichAsyncFunction<String, Tuple2<
> String, String>> {
>
> 2
>
> 3    *private* *transient* AysncHttpClient client;
>
> 4
>
> 5    @Override
>
> 6    *public* *void* *open*(Configuration parameters) *throws* Exception {
>
> 7        client = *new* AysncHttpClient();
>
> 8    }
>
> 9
>
> 10    @Override
>
> 11    *public* *void* close() *throws* Exception {
>
> 12        client.close();
>
> 13    }
>
> 14
>
> 15    @Override
>
> 16    *public* *void* asyncInvoke(String key, *final* ResultFuture<Tuple2<
> String, String>> resultFuture) *throws* Exception {
>
> 17
>
> 18        *// issue the asynchronous request, receive a future for result*
>
> 19        CompleteableFuture<HttpResponse<String>> future = httpClient
> .sendAsync(request, HttpResponse.BodyHandlers.ofString())
>
> 20
>
> 21        future.whenCompleteAsync((response, throwable) -> {
>
> 22          *if* (throwable != *null* ) {
>
> 23
>
> 24              resultFuture.completeExceptionally(throwable);
>
> 25          }
>
> 26          *else* {
>
> 27            *if* (resp.statusCode() == HttpStatus.SC_OK) {
>
> 28              resultFuture.complete(Collections.singleton(*new* Tuple2
> <>(key, response.body())
>
> 29            }
>
> 30            *else* *if* (resp.statusCode() == HttpStatus.SC_NOT_FOUND) {
>
> 32              resultFuture.complete(Collections.emptyList())
>
> 33            }
>
> 34            *else* {
>
> 35               resultFuture.completeExceptionally(*new* RuntimeException
> ("Server processing error"));
>
> 36            }
>
> 37          }
>
> 38
>
> 39        })
>
> 40
>
> 41
>
> 42    }
>
> 43}
>
>
>
> 1 . If the future completes exceptionally, ie resultFuture
> .completeExceptionally(throwable);
>
> does the input message get discarded?
>

Which input do you mean here, "request"? It is not defined in your snippet,
did it get lost when trimming unimportant parts?
By default, you will get only the contained throwable, you would have to
enrich it with the input if you want to retain it.

2. Should the request be made on a dedicated ExecutorService or is the
> forkpoolcommon sufficient?
>

I don't see a good reason in general here to use a separate thread pool for
the requests. They are async (not blocking), are part of your Flink job and
run on your Taskmanagers. Unless there is something special in your setup
that makes you suspect they block other tasks...


> 3. If the rest api service for example returns 404, should you complete
> with an empty collection or can you omit line 32 entirely?
>

This depends on your desired behavior: Do you want it to complete normally,
but without any results (this is your current state), or do you want it to
complete exceptionally?

Best regards,
Nico


>
>
> Thanks!
>
>
>
>
>

Reply via email to