lucasbru commented on code in PR #15369: URL: https://github.com/apache/kafka/pull/15369#discussion_r1495811911
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -98,6 +99,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; Review Comment: not used ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -727,6 +732,17 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { } } + private boolean isGenerationKnown() { Review Comment: How about this one-liner ``` return groupMetadata.filter(g -> g.generationId() != JoinGroupRequest.UNKNOWN_GENERATION_ID).isPresent(); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -708,14 +709,18 @@ public ConsumerRecords<K, V> poll(final Duration timeout) { wakeupTrigger.maybeTriggerWakeup(); updateAssignmentMetadataIfNeeded(timer); - final Fetch<K, V> fetch = pollForFetches(timer); - if (!fetch.isEmpty()) { - if (fetch.records().isEmpty()) { - log.trace("Returning empty records from `poll()` " + if (isGenerationKnown()) { Review Comment: If this check is false... where do we block? It seems like we busy loop with 100% CPU, but maybe I'm wrong ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1467,6 +1483,7 @@ public void unsubscribe() { } catch (TimeoutException e) { log.error("Failed while waiting for the unsubscribe event to complete"); } + groupMetadata = initializeGroupMetadata(groupMetadata.get().groupId(), Optional.empty()); Review Comment: This looks like an independent fix? Maybe mention in the PR description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org