Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223999 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture<?> shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture<Boolean> queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture<Boolean> groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture<Boolean> handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) --- End diff -- why not setting the timeout here to `0`?
---