showuon commented on code in PR #12611:
URL: https://github.com/apache/kafka/pull/12611#discussion_r967930755


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##########
@@ -272,6 +272,39 @@ public void testCoordinatorDiscoveryBackoff() {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
+    @Test
+    public void testNoWakeupWhenNonBlockingDiscoverCoordinator() {
+        setupCoordinator();
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+
+        consumerClient.wakeup();
+
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+        // a follow-up poll should still throw
+        try {
+            coordinator.joinGroupIfNeeded(mockTime.timer(0));
+            fail("Should have woken up from joinGroupIfNeeded()");
+        } catch (WakeupException ignored) {
+        }
+    }
+
+    @Test
+    public void testWakeupWhenBlockingDiscoverCoordinator() throws Exception {
+        setupCoordinator();
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+
+        consumerClient.wakeup();
+
+        try {
+            coordinator.ensureCoordinatorReady(mockTime.timer(1));
+            fail("Should have woken up from ensureCoordinatorReady()");
+        } catch (WakeupException ignored) {
+        }

Review Comment:
   nit: can be replaced with `assertThrows`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##########
@@ -272,6 +272,39 @@ public void testCoordinatorDiscoveryBackoff() {
         assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
     }
 
+    @Test
+    public void testNoWakeupWhenNonBlockingDiscoverCoordinator() {
+        setupCoordinator();
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+
+        consumerClient.wakeup();
+
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+        // a follow-up poll should still throw
+        try {
+            coordinator.joinGroupIfNeeded(mockTime.timer(0));
+            fail("Should have woken up from joinGroupIfNeeded()");
+        } catch (WakeupException ignored) {

Review Comment:
   nit:
   ```java
   assertThrows(WakeupException.class, 
   () -> coordinator.joinGroupIfNeeded(mockTime.timer(0)), 
   "Should have woken up from joinGroupIfNeeded()")
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -249,7 +249,14 @@ protected synchronized boolean 
ensureCoordinatorReady(final Timer timer) {
                 throw fatalException;
             }
             final RequestFuture<Void> future = lookupCoordinator();
-            client.poll(future, timer);
+
+            // if we do not want to block on discovering coordinator at all,
+            // then we should not try to poll in a loop, and should not throw 
wake-up exception either
+            if (timer.timeoutMs() == 0L) {

Review Comment:
   > would we expect the next blocking call to trigger a wakeup even if it were 
called with a timeout of zero?
   
   Yes, I think as long as it's a blocking call, the wakeup exception should be 
thrown, even if it's zero timeout. I've checked existing javadoc in 
KafkaConsumer, all the methods, which declared wakeupException will be thrown, 
will still throw wakeupExceptions after this change. So, I think this PR change 
(1) won't break API, and (2) fixes the issue for commitAsync, which makes sense 
to me.
   
   > I wonder if it would be better to overload ensureCoordinatorReady with an 
additional flag?
   
   Yes, as @guozhangwang mentioned in PR description:
   > In this PR I'm trying to fix it in a least intrusive way (a more general 
fix should be, potentially, to have two versions of ensureCoordinatorReady)
   
   I think this fix is safer. But I don't have strong opinion about it, just 
want to raise the release timing issue here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to