Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150229482 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); + pending.complete(response); } } @Override public void onRequestFailure(long requestId, Throwable cause) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { + if (pending != null && !pending.isDone()) { stats.reportFailedRequest(); + pending.completeExceptionally(cause); } } @Override public void onFailure(Throwable cause) { - if (close(cause)) { - // Remove from established channels, otherwise future - // requests will be handled by this failed channel. - establishedConnections.remove(serverAddress, this); - } + close(cause).thenAccept(cancelled -> { + if (cancelled) { --- End diff -- Why do we only remove `this` from `establischedConnections` if `cancelled` is true?
---