yandrey321 commented on code in PR #9718:
URL: https://github.com/apache/ozone/pull/9718#discussion_r2788356596


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java:
##########
@@ -235,24 +260,41 @@ private boolean isConnected(ManagedChannel channel) {
    * Closes all the communication channels of the client one-by-one.
    * When a channel is closed, no further requests can be sent via the channel,
    * and the method waits to finish all ongoing communication.
-   *
-   * Note: the method wait 1 hour per channel tops and if that is not enough
-   * to finish ongoing communication, then interrupts the connection anyway.
    */
   @Override
-  public synchronized void close() {
-    closed = true;
+  public void close() {
+    if (!isClosed.compareAndSet(false, true)) {
+      // we should allow only one thread to perform the close operation to 
make it idempotent
+      return;
+    }
+
     for (ManagedChannel channel : channels.values()) {
-      channel.shutdownNow();
+      channel.shutdown();
+    }
+
+    final long maxWaitNanos = 
TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
+    long deadline = System.nanoTime() + maxWaitNanos;
+    List<ManagedChannel> nonTerminatedChannels = new 
ArrayList<>(channels.values());
+
+    while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) {
+      nonTerminatedChannels.removeIf(ManagedChannel::isTerminated);
       try {
-        channel.awaitTermination(60, TimeUnit.MINUTES);
+        Thread.sleep(SHUTDOWN_WAIT_INTERVAL_MILLIS);
       } catch (InterruptedException e) {
-        LOG.error("InterruptedException while waiting for channel termination",
-            e);
-        // Re-interrupt the thread while catching InterruptedException
+        LOG.error("Interrupted while waiting for channels to terminate", e);
         Thread.currentThread().interrupt();
+        break;
       }
     }
+
+    Set<DatanodeID> failedChannels = channels.entrySet().stream()
+        .filter(e -> !e.getValue().isTerminated())
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toSet());
+    LOG.warn("Channels {} did not terminate within timeout.", failedChannels);
+
+    channels.keySet().removeIf(e -> !failedChannels.contains(e));

Review Comment:
   we need to wipe channels and stubs from the map even if they timed out on 
shutdown. We dont have retry mechanism to close them afterwards and we cannot 
reuse channels after calling shutdown on them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to