dajac commented on code in PR #15275:
URL: https://github.com/apache/kafka/pull/15275#discussion_r1484072806


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -1392,4 +1356,16 @@ public void registerStateListener(MemberStateListener 
listener) {
         }
         this.stateUpdatesListeners.add(listener);
     }
+
+    /**
+     * If either a new target assignment or new metadata is available that we 
have not yet attempted
+     * to reconcile, and we are currently in state RECONCILING, trigger 
reconciliation.
+     */
+    @Override
+    public PollResult poll(final long currentTimeMs) {
+        if (state == MemberState.RECONCILING) {
+            maybeReconcile();

Review Comment:
   I have one remaining question for my understanding. If there is unresolved 
topic ids, is it going to request a metadata update (in 
`findResolvableAssignmentAndTriggerMetadataUpdate`) every time that `poll` is 
called? I believe that it won't be the case because the `Metadata` class will 
request a full update only once. Is my understanding correct?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -1521,6 +1660,18 @@ private ConsumerRebalanceListenerInvoker 
consumerRebalanceListenerInvoker() {
         );
     }
 
+    private SortedSet<TopicPartition> topicPartitions(Map<Uuid, 
SortedSet<Integer>> topicIdMap, Map<Uuid, String> topicIdNames) {

Review Comment:
   nit: Could it be static?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to