Hello,

 

I have a recently ran into an issue with RichAsyncFunction and wanted to get some guidance from the community

 

Please see snippet

 

class AsyncFetchFromHttp extends RichAsyncFunction<String, Tuple2<String, String>> {

2

3    private transient AysncHttpClient client;

4

5    @Override

6    public void open(Configuration parameters) throws Exception {

7        client = new AysncHttpClient();

8    }

9

10    @Override

11    public void close() throws Exception {

12        client.close();

13    }

14

15    @Override

16    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

17

18        // issue the asynchronous request, receive a future for result

19        CompleteableFuture<HttpResponse<String>> future = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())

20       

21        future.whenCompleteAsync((response, throwable) -> {

22          if (throwable != null ) {

23

24              resultFuture.completeExceptionally(throwable);

25          }

26          else {

27            if (resp.statusCode() == HttpStatus.SC_OK) {

28              resultFuture.complete(Collections.singleton(new Tuple2<>(key, response.body())

29            } 

30            else if (resp.statusCode() == HttpStatus.SC_NOT_FOUND) {

32              resultFuture.complete(Collections.emptyList())

33            }

34            else {

35               resultFuture.completeExceptionally(new RuntimeException("Server processing error"));

36            }

37          }

38       

39        })

40

41

42    }

43}

 

1 . If the future completes exceptionally, ie resultFuture.completeExceptionally(throwable);

does the input message get discarded?

2. Should the request be made on a dedicated ExecutorService or is the forkpoolcommon sufficient?

3. If the rest api service for example returns 404, should you complete with an empty collection or can you omit line 32 entirely?

 

Thanks!

 

 

Reply via email to