showuon commented on code in PR #12611: URL: https://github.com/apache/kafka/pull/12611#discussion_r967930755
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java: ########## @@ -272,6 +272,39 @@ public void testCoordinatorDiscoveryBackoff() { assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS); } + @Test + public void testNoWakeupWhenNonBlockingDiscoverCoordinator() { + setupCoordinator(); + + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + + consumerClient.wakeup(); + + coordinator.ensureCoordinatorReady(mockTime.timer(0)); + + // a follow-up poll should still throw + try { + coordinator.joinGroupIfNeeded(mockTime.timer(0)); + fail("Should have woken up from joinGroupIfNeeded()"); + } catch (WakeupException ignored) { + } + } + + @Test + public void testWakeupWhenBlockingDiscoverCoordinator() throws Exception { + setupCoordinator(); + + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + + consumerClient.wakeup(); + + try { + coordinator.ensureCoordinatorReady(mockTime.timer(1)); + fail("Should have woken up from ensureCoordinatorReady()"); + } catch (WakeupException ignored) { + } Review Comment: nit: can be replaced with `assertThrows` ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java: ########## @@ -272,6 +272,39 @@ public void testCoordinatorDiscoveryBackoff() { assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS); } + @Test + public void testNoWakeupWhenNonBlockingDiscoverCoordinator() { + setupCoordinator(); + + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + + consumerClient.wakeup(); + + coordinator.ensureCoordinatorReady(mockTime.timer(0)); + + // a follow-up poll should still throw + try { + coordinator.joinGroupIfNeeded(mockTime.timer(0)); + fail("Should have woken up from joinGroupIfNeeded()"); + } catch (WakeupException ignored) { Review Comment: nit: ```java assertThrows(WakeupException.class, () -> coordinator.joinGroupIfNeeded(mockTime.timer(0)), "Should have woken up from joinGroupIfNeeded()") ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ########## @@ -249,7 +249,14 @@ protected synchronized boolean ensureCoordinatorReady(final Timer timer) { throw fatalException; } final RequestFuture<Void> future = lookupCoordinator(); - client.poll(future, timer); + + // if we do not want to block on discovering coordinator at all, + // then we should not try to poll in a loop, and should not throw wake-up exception either + if (timer.timeoutMs() == 0L) { Review Comment: > would we expect the next blocking call to trigger a wakeup even if it were called with a timeout of zero? Yes, I think as long as it's a blocking call, the wakeup exception should be thrown, even if it's zero timeout. I've checked existing javadoc in KafkaConsumer, all the methods, which declared wakeupException will be thrown, will still throw wakeupExceptions after this change. So, I think this PR change (1) won't break API, and (2) fixes the issue for commitAsync, which makes sense to me. > I wonder if it would be better to overload ensureCoordinatorReady with an additional flag? Yes, as @guozhangwang mentioned in PR description: > In this PR I'm trying to fix it in a least intrusive way (a more general fix should be, potentially, to have two versions of ensureCoordinatorReady) I think this fix is safer. But I don't have strong opinion about it, just want to raise the release timing issue here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org