Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150899000 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +472,31 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { - if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + private CompletableFuture<Boolean> close(Throwable cause) { + final CompletableFuture<Boolean> shutdownFuture = new CompletableFuture<>(); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + if (connectionShutdownFuture.compareAndSet(null, shutdownFuture) && + failureCause.compareAndSet(null, cause)) { + + channel.close().addListener(finished -> { + stats.reportInactiveConnection(); + for (long requestId : pendingRequests.keySet()) { + TimestampedCompletableFuture pending = pendingRequests.remove(requestId); + if (pending != null && pending.completeExceptionally(cause)) { + stats.reportFailedRequest(); + } } - } - return true; + + if (finished.isSuccess()) { --- End diff -- This seems weird at first sight but I'm guessing it's correct. I.e. we never finish the returned Future with the `cause` that was handed in. We only fail it exceptionally if anything in closing the channel went wrong, right?
---