Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228244 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture<Boolean> close(Throwable cause) { + final CompletableFuture<Boolean> shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture<?> tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + tmp.thenRun(() -> { --- End diff -- here as well.
---