kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1385620217


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) {
         closeTimeout = timeout;
         wakeup();
 
-        if (timeoutMs > 0) {
-            try {
-                join(timeoutMs);
-            } catch (InterruptedException e) {
-                log.error("Interrupted while waiting for consumer network 
thread to complete", e);
-            }
+        try {

Review Comment:
   Because one of the unit tests fails without it 😄
   
   In the existing `KafkaConsumer.close()`, the coordinator eventually calls 
this method:
   
   ```java
       private void closeHeartbeatThread() {
           HeartbeatThread thread;
           synchronized (this) {
               if (heartbeatThread == null)
                   return;
               heartbeatThread.close();
               thread = heartbeatThread;
               heartbeatThread = null;
           }
           try {
               thread.join();
           } catch (InterruptedException e) {
               log.warn("Interrupted while waiting for consumer heartbeat 
thread to close");
               throw new InterruptException(e);
           }
       }
   ```
   
   By blocking via `Thread.join()`, it allows the coordinator to close fully 
before its underlying resources (client, etc.) are closed. In contrast, our 
background thread was only waiting up to the timeout, and then proceeded with 
closing the other resources even though the background thread may not have 
completed its cleanup.
   
   If you'd prefer, I can save this change for a separate PR?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -247,12 +247,10 @@ private void closeInternal(final Duration timeout) {
         closeTimeout = timeout;
         wakeup();
 
-        if (timeoutMs > 0) {
-            try {
-                join(timeoutMs);
-            } catch (InterruptedException e) {
-                log.error("Interrupted while waiting for consumer network 
thread to complete", e);
-            }
+        try {

Review Comment:
   Because one of the unit tests fails without it 😄
   
   In the existing `KafkaConsumer.close()`, the coordinator eventually calls 
this method:
   
   ```java
       private void closeHeartbeatThread() {
           HeartbeatThread thread;
           synchronized (this) {
               if (heartbeatThread == null)
                   return;
               heartbeatThread.close();
               thread = heartbeatThread;
               heartbeatThread = null;
           }
           try {
               thread.join();
           } catch (InterruptedException e) {
               log.warn("Interrupted while waiting for consumer heartbeat 
thread to close");
               throw new InterruptException(e);
           }
       }
   ```
   
   By blocking via `Thread.join()`, it allows the coordinator to close fully 
before its underlying resources (client, etc.) are closed. In contrast, our 
background thread was only waiting up to the timeout, and then proceeded with 
closing the other resources even though the background thread may not have 
completed its cleanup.
   
   If you'd prefer, I can save this change for a separate PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to