lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1952918176


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -818,6 +826,8 @@ void maybeReconcile() {
             return;
         }
 
+        if (autoCommitEnabled && !canCommit) return;

Review Comment:
   I think we can improve this a bit more: if there are no partitions to 
revoke, we could carry on with this reconciliation really, meaning no delay 
reconciling newly added partitions (reconciled from the background poll as 
before, no need to wait for the app poll).  
   
   So, I expect we just need to move this check (along with the 
markReconciliationInProg) to right before the log.info("Reconciling assignment 
with local epoch...")? and then we could check:
   
   ```
   if (autoCommitEnabled && !revokedPartitions.isEmpty() && !canCommit) return;
   ```
   
   would that work? 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1301,7 +1299,7 @@ public void 
testUnresolvedTargetAssignmentIsReconciledWhenMetadataReceived() {
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
         
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -881,7 +879,7 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
         );
         when(metadata.topicNames()).thenReturn(fullTopicMetadata);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -819,8 +818,7 @@ public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithNewAssign
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         clearInvocations(membershipManager, commitRequestManager);
 
-        // Next poll should trigger final reconciliation
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   same here, if this test has no revocations (seems so?)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1249,7 +1247,7 @@ public void 
testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
 
         verifyReconciliationNotTriggered(membershipManager);
 
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -2226,7 +2223,7 @@ public void 
testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -2655,7 +2652,7 @@ private ConsumerMembershipManager 
createMemberInStableState(String groupInstance
         
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
         membershipManager.onHeartbeatSuccess(heartbeatResponse);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1403,7 +1401,7 @@ public void 
testReconciliationSkippedWhenSameAssignmentReceived() {
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto? (not sure)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -663,7 +663,7 @@ public void 
testSameAssignmentReconciledAgainWithMissingTopic() {
         // stay in RECONCILING state, since an unresolved topic is assigned
         
membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(assignment1,
 membershipManager.memberId()));
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   helpful to see this passing with `maybeReconcile(false)` here, since this 
test doesn't seem to have any revocations right? (would be good test coverage 
to ensure we still reconcile in the same way, called from background poll, when 
it's only about adding partitions)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1594,7 +1591,7 @@ public void 
testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   I expect this one should be reconciled asap (param false), and the next one 
below, ln 1618, would need to wait to commit (param true)



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1363,7 +1361,7 @@ public void 
testReconcileNewPartitionsAssignedWhenNoPartitionOwned() {
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1207,7 +1205,7 @@ public void 
testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
         receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1546,8 +1544,7 @@ public void 
testMetadataUpdatesReconcilesUnresolvedAssignments() {
         String topicName = "topic1";
         mockTopicNameInMetadataCache(Collections.singletonMap(topicId, 
topicName), true);
 
-        // When the next poll is run, the member should re-trigger 
reconciliation
-        membershipManager.poll(time.milliseconds());
+        membershipManager.maybeReconcile(true);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to