[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260869#comment-16260869 ]
ASF GitHub Bot commented on FLINK-7880: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5038#discussion_r152300724 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,28 +167,57 @@ public String getClientName() { * Shuts down the client and closes all connections. * * <p>After a call to this method, all returned futures will be failed. + * + * @return A {@link CompletableFuture} that will be completed when the shutdown process is done. */ - public void shutdown() { - if (shutDown.compareAndSet(false, true)) { + public CompletableFuture<?> shutdown() { + final CompletableFuture<?> newShutdownFuture = new CompletableFuture<>(); + if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) { + + final List<CompletableFuture<?>> connectionFutures = new ArrayList<>(); + for (Map.Entry<InetSocketAddress, EstablishedConnection> conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry<InetSocketAddress, PendingConnection> conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + CompletableFuture.allOf( + connectionFutures.toArray(new CompletableFuture<?>[connectionFutures.size()]) + ).whenComplete((result, throwable) -> { + if (throwable != null) { + newShutdownFuture.completeExceptionally(throwable); + } else if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null && !group.isShutdown()) { + group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS) + .addListener(finished -> { + if (finished.isSuccess()) { + newShutdownFuture.complete(null); + } else { + newShutdownFuture.completeExceptionally(finished.cause()); + } + }); + } else { + newShutdownFuture.complete(null); + } + } else { + newShutdownFuture.complete(null); } + }); + + // check again if in the meantime another thread completed the future + if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) { --- End diff -- where in close() do we set the shutdown future to null? I only see that being done in sendRequest. (which seems fishy) > flink-queryable-state-java fails with core-dump > ----------------------------------------------- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests > Affects Versions: 1.4.0 > Reporter: Till Rohrmann > Assignee: Kostas Kloudas > Priority: Critical > Labels: test-stability > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)