philipnee commented on code in PR #12590:
URL: https://github.com/apache/kafka/pull/12590#discussion_r1054670620


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1943,79 @@ private Map<String, String> 
topicPartitionTags(TopicPartition tp) {
         }
     }
 
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        for (final Map.Entry<Integer, FetchSessionHandler> entry : 
sessionHandlers.entrySet()) {
+            final FetchSessionHandler sessionHandler = entry.getValue();
+            // set the session handler to notify close. This will set the next 
metadata request to send close message.
+            sessionHandler.notifyClose();
+
+            final int sessionId = sessionHandler.sessionId();
+            final Integer fetchTargetNodeId = entry.getKey();
+            // FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+            // skip sending the close request.
+            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+            if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+                log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+                continue;
+            }
+
+            final RequestFuture<ClientResponse> responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+            responseFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
+                @Override
+                public void onSuccess(ClientResponse value) {
+                    log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);

Review Comment:
   do we really need to log all of the successful close? I think logging the 
failure ones could be sufficient.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -2403,17 +2404,40 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<
         return clusterResourceListeners;
     }
 
-    private void close(long timeoutMs, boolean swallowException) {
+    private void close(Duration timeout, boolean swallowException) {
         log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
-        try {
-            if (coordinator != null)
-                coordinator.close(time.timer(Math.min(timeoutMs, 
requestTimeoutMs)));
-        } catch (Throwable t) {
-            firstException.compareAndSet(null, t);
-            log.error("Failed to close coordinator", t);
+
+        final Timer closeTimer = (time == null) ? new 
SystemTime().timer(Math.min(timeout.toMillis(), requestTimeoutMs)) : 
time.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+        // Close objects with a timeout. The timeout is required because 
fetcher makes request to the server in the
+        // process of closing which may not respect the overall timeout 
defined for closing the consumer.
+        if (coordinator != null) {
+            try {
+                coordinator.close(closeTimer);

Review Comment:
   note: I think it also tries blocking commits the offset during the close.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -446,6 +429,33 @@ private RequestFuture<ClientResponse> 
sendMetadataRequest(MetadataRequest.Builde
             return client.send(node, request);
     }
 
+    /**
+     * Send Fetch Request to Kafka cluster asynchronously.
+     *
+     * This method is visible for testing.
+     *
+     * @return A future that indicates result of sent Fetch request
+     */
+    RequestFuture<ClientResponse> sendFetchRequestToNode(final 
FetchSessionHandler.FetchRequestData requestData,

Review Comment:
   can this be private?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1943,79 @@ private Map<String, String> 
topicPartitionTags(TopicPartition tp) {
         }
     }
 
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        for (final Map.Entry<Integer, FetchSessionHandler> entry : 
sessionHandlers.entrySet()) {
+            final FetchSessionHandler sessionHandler = entry.getValue();
+            // set the session handler to notify close. This will set the next 
metadata request to send close message.
+            sessionHandler.notifyClose();
+
+            final int sessionId = sessionHandler.sessionId();
+            final Integer fetchTargetNodeId = entry.getKey();
+            // FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+            // skip sending the close request.
+            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);

Review Comment:
   could really just be `cluster.nodeById(entry.getKey())`.  



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -150,6 +150,10 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+/**
+ * Note to future authors in this class. If you close the consumer, close with 
DURATION.ZERO to reduce the duration of the
+ * test.

Review Comment:
   can be combined into the previous line as we don't have the hard limit on 
number of char per line :)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -2403,17 +2404,46 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<
         return clusterResourceListeners;
     }
 
-    private void close(long timeoutMs, boolean swallowException) {
+    private Timer createTimerForRequest(final Duration timeout) {
+        final Time localTime = (time == null) ? new SystemTime() : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
         log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
-        try {
-            if (coordinator != null)
-                coordinator.close(time.timer(Math.min(timeoutMs, 
requestTimeoutMs)));
-        } catch (Throwable t) {
-            firstException.compareAndSet(null, t);
-            log.error("Failed to close coordinator", t);
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        // Close objects with a timeout. The timeout is required because 
coordinator & fetcher send requests to the
+        // server in the process of closing which may not respect the overall 
timeout defined for closing the consumer.
+        if (coordinator != null) {
+            try {
+                // This is a blocking call bound by the time remaining in 
closeTimer
+                coordinator.close(closeTimer);
+            } catch (Throwable t) {
+                firstException.compareAndSet(null, t);
+                log.error("Failed to close consumer coordinator", t);
+            }
         }
-        Utils.closeQuietly(fetcher, "fetcher", firstException);
+
+        if (fetcher != null) {

Review Comment:
   would it make sense to add a 
   ```
       public static void closeQuietlySync(AutoCloseable closeable, String 
name, AtomicReference<Throwable> firstException, Timer timer) {
   ```
   
   To handle both coordinator and fetcher blocking close?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1943,79 @@ private Map<String, String> 
topicPartitionTags(TopicPartition tp) {
         }
     }
 
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        for (final Map.Entry<Integer, FetchSessionHandler> entry : 
sessionHandlers.entrySet()) {
+            final FetchSessionHandler sessionHandler = entry.getValue();
+            // set the session handler to notify close. This will set the next 
metadata request to send close message.
+            sessionHandler.notifyClose();
+
+            final int sessionId = sessionHandler.sessionId();
+            final Integer fetchTargetNodeId = entry.getKey();
+            // FetchTargetNode may not be available as it may have 
disconnected the connection. In such cases, we will
+            // skip sending the close request.
+            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+            if (fetchTarget == null || client.isUnavailable(fetchTarget)) {
+                log.debug("Skip sending close session request to broker {} 
since it is not reachable", fetchTarget);
+                continue;
+            }
+
+            final RequestFuture<ClientResponse> responseFuture = 
sendFetchRequestToNode(sessionHandler.newBuilder().build(), fetchTarget);
+            responseFuture.addListener(new 
RequestFutureListener<ClientResponse>() {
+                @Override
+                public void onSuccess(ClientResponse value) {
+                    log.debug("Successfully sent a close message for fetch 
session: {} to node: {}", sessionId, fetchTarget);
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    log.debug("Unable to a close message for fetch session: {} 
to node: {}. " +
+                        "This may result in unnecessary fetch sessions at the 
broker.", sessionId, fetchTarget, e);
+                }
+            });
+
+            requestFutures.add(responseFuture);
+        }
+
+        // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
+        // all requests have received a response.
+        do {
+            client.poll(timer, null, true);
+        } while (timer.notExpired() && 
!requestFutures.stream().allMatch(RequestFuture::isDone));

Review Comment:
   is it possible to implement these conditions in the pollCondition? maybe like
   client.poll(timer, () -> { 
requestFutures.stream().allMatch(RequestFuture::isDone) }, true)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -2403,17 +2404,46 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<
         return clusterResourceListeners;
     }
 
-    private void close(long timeoutMs, boolean swallowException) {
+    private Timer createTimerForRequest(final Duration timeout) {
+        final Time localTime = (time == null) ? new SystemTime() : time;
+        return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
+    }
+
+    private void close(Duration timeout, boolean swallowException) {
         log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
-        try {
-            if (coordinator != null)
-                coordinator.close(time.timer(Math.min(timeoutMs, 
requestTimeoutMs)));
-        } catch (Throwable t) {
-            firstException.compareAndSet(null, t);
-            log.error("Failed to close coordinator", t);
+
+        final Timer closeTimer = createTimerForRequest(timeout);
+        // Close objects with a timeout. The timeout is required because 
coordinator & fetcher send requests to the
+        // server in the process of closing which may not respect the overall 
timeout defined for closing the consumer.
+        if (coordinator != null) {
+            try {
+                // This is a blocking call bound by the time remaining in 
closeTimer
+                coordinator.close(closeTimer);
+            } catch (Throwable t) {
+                firstException.compareAndSet(null, t);
+                log.error("Failed to close consumer coordinator", t);
+            }
         }
-        Utils.closeQuietly(fetcher, "fetcher", firstException);
+
+        if (fetcher != null) {
+            // the timeout for the session close is at-most the 
requestTimeoutMs
+            long remainingDurationInTimeout = Math.max(0, timeout.toMillis() - 
closeTimer.elapsedMs());
+            if (remainingDurationInTimeout > 0) {

Review Comment:
   can it just be this? 
   ```
   long remainingDurationInTimeout = Math.min(requestTimeoutMs, Math.max(0, 
timeout.toMillis() - closeTimer.elapsedMs()));
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -2403,17 +2404,46 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<
         return clusterResourceListeners;
     }
 
-    private void close(long timeoutMs, boolean swallowException) {
+    private Timer createTimerForRequest(final Duration timeout) {
+        final Time localTime = (time == null) ? new SystemTime() : time;

Review Comment:
   mind elaborate on when the time object can be null?  maybe when we inject 
the time object? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -1933,11 +1941,79 @@ private Map<String, String> 
topicPartitionTags(TopicPartition tp) {
         }
     }
 
+    // Visible for testing
+    void maybeCloseFetchSessions(final Timer timer) {
+        final Cluster cluster = metadata.fetch();
+        final List<RequestFuture<ClientResponse>> requestFutures = new 
ArrayList<>();
+        for (final Map.Entry<Integer, FetchSessionHandler> entry : 
sessionHandlers.entrySet()) {

Review Comment:
   maybe 
   ```
   requestFutures = sessionHandlers.entrySet().stream().filter(e -> 
closeable(e)).map(// toRespFuture()).collect(Collectors.toList())
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -150,6 +150,10 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+/**
+ * Note to future authors in this class. If you close the consumer, close with 
DURATION.ZERO to reduce the duration of the
+ * test.

Review Comment:
   we are doing a lot of close(Duration.ZERO) here, maybe it make sense to 
implement a wrapper class around the consumer with close() method, in this 
test, so that we could continue using try-with-resource pattern here. I was 
wondering if we could leverage autoclosable and avoid explicit calling 
close(Duration.ZERO) in every test.
   
   or alternatively, we could try to close in `@AfterEach`
   ```
   if (autoClosing && consumer != null) {
     consumer.close(Duration.ZERO);
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to