> > 1 . If the future completes exceptionally, ie resultFuture > .completeExceptionally(throwable); > > does the input message get discarded? > If you add an exception that will bubble up and let the task fail. Fail-over then determines what happens but in most cases the job will restart to an earlier checkpoint. If the source rewinds to a respective offset, Flink will replay all messages from the checkpoint onward and resubmit them with async IO.
That means that you will not lose any input message but you receive duplicate requests (everything between checkpoint and failure). 2. Should the request be made on a dedicated ExecutorService or is the > forkpoolcommon sufficient? > Use a dedicated ExecutorService to exert more control over the thread pool. Usually, you want to have as many threads as you have set the queue capacity. Also note that if you run asyncIO in parallel, each task should have its own queue as several tasks may reside in the same Java process. With the common pool, you would only have as many threads as there are cores, so you end up with 1 thread per task instance in the usual parallel execution of asyncIO. It gets even worse if you have several asyncIOs in your pipeline. 3. If the rest api service for example returns 404, should you complete > with an empty collection or can you omit line 32 entirely? > You should always call some complete*, so Flink knows that the element has been processed and can continue processing. On Tue, Oct 5, 2021 at 10:06 PM Nicolaus Weidner < nicolaus.weid...@ververica.com> wrote: > Hi Tom, > > On Mon, Oct 4, 2021 at 10:42 PM tom yang <ensc...@gmail.com> wrote: > >> Hello, >> >> >> >> I have a recently ran into an issue with RichAsyncFunction and wanted to >> get some guidance from the community >> >> >> >> Please see snippet >> >> >> >> *class* AsyncFetchFromHttp *extends* RichAsyncFunction<String, Tuple2< >> String, String>> { >> >> 2 >> >> 3 *private* *transient* AysncHttpClient client; >> >> 4 >> >> 5 @Override >> >> 6 *public* *void* *open*(Configuration parameters) *throws* Exception >> { >> >> 7 client = *new* AysncHttpClient(); >> >> 8 } >> >> 9 >> >> 10 @Override >> >> 11 *public* *void* close() *throws* Exception { >> >> 12 client.close(); >> >> 13 } >> >> 14 >> >> 15 @Override >> >> 16 *public* *void* asyncInvoke(String key, *final* ResultFuture<Tuple2 >> <String, String>> resultFuture) *throws* Exception { >> >> 17 >> >> 18 *// issue the asynchronous request, receive a future for >> result* >> >> 19 CompleteableFuture<HttpResponse<String>> future = httpClient >> .sendAsync(request, HttpResponse.BodyHandlers.ofString()) >> >> 20 >> >> 21 future.whenCompleteAsync((response, throwable) -> { >> >> 22 *if* (throwable != *null* ) { >> >> 23 >> >> 24 resultFuture.completeExceptionally(throwable); >> >> 25 } >> >> 26 *else* { >> >> 27 *if* (resp.statusCode() == HttpStatus.SC_OK) { >> >> 28 resultFuture.complete(Collections.singleton(*new* Tuple2 >> <>(key, response.body()) >> >> 29 } >> >> 30 *else* *if* (resp.statusCode() == HttpStatus.SC_NOT_FOUND) >> { >> >> 32 resultFuture.complete(Collections.emptyList()) >> >> 33 } >> >> 34 *else* { >> >> 35 resultFuture.completeExceptionally(*new* >> RuntimeException("Server processing error")); >> >> 36 } >> >> 37 } >> >> 38 >> >> 39 }) >> >> 40 >> >> 41 >> >> 42 } >> >> 43} >> >> >> >> 1 . If the future completes exceptionally, ie resultFuture >> .completeExceptionally(throwable); >> >> does the input message get discarded? >> > > Which input do you mean here, "request"? It is not defined in your > snippet, did it get lost when trimming unimportant parts? > By default, you will get only the contained throwable, you would have to > enrich it with the input if you want to retain it. > > 2. Should the request be made on a dedicated ExecutorService or is the >> forkpoolcommon sufficient? >> > > I don't see a good reason in general here to use a separate thread pool > for the requests. They are async (not blocking), are part of your Flink job > and run on your Taskmanagers. Unless there is something special in your > setup that makes you suspect they block other tasks... > > >> 3. If the rest api service for example returns 404, should you complete >> with an empty collection or can you omit line 32 entirely? >> > > This depends on your desired behavior: Do you want it to complete > normally, but without any results (this is your current state), or do you > want it to complete exceptionally? > > Best regards, > Nico > > >> >> >> Thanks! >> >> >> >> >> >