lianetm commented on code in PR #17165:
URL: https://github.com/apache/kafka/pull/17165#discussion_r1773915749


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1617,10 +1617,25 @@ public void 
testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
     }
 
     @Test
-    public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
+    public void 
testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() {

Review Comment:
   there is no "shouldTransitionToJoining" var/concept anymore, so should we 
update this ? maybe simply `...DoesNotTransitionToJoiningIfInGroup`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -189,6 +191,12 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
     private final Time time;
 
+    /**
+     * AtomicBoolean to track whether the subscription is updated.
+     * If it's true and subscription state is UNSUBSCRIBED, the next {@link 
#maybeJoinGroup()} will change member state to JOINING.
+     */
+    private final AtomicBoolean hasSubscriptionUpdated = new 
AtomicBoolean(false);

Review Comment:
   nit: what about simply `subscriptionUpdated` ...to track whether the 
subscription has been updated



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java:
##########
@@ -1617,10 +1617,25 @@ public void 
testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
     }
 
     @Test
-    public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
+    public void 
testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() {
         ConsumerMembershipManager membershipManager = 
createMemberInStableState();
         membershipManager.onSubscriptionUpdated();
+        assertTrue(membershipManager.hasSubscriptionUpdated());
+        membershipManager.maybeJoinGroup();
         verify(membershipManager, never()).transitionToJoining();
+        assertFalse(membershipManager.hasSubscriptionUpdated());
+    }
+
+    @Test
+    public void 
testOnSubscriptionUpdatedSetShouldTransitionToJoiningIfNotInGroup() {

Review Comment:
   `testOnSubscriptionUpdatedTransitionsToJoiningIOnPollfNotInGroup`?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java:
##########
@@ -1137,10 +1137,25 @@ public void 
testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
     }
 
     @Test
-    public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
+    public void 
testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() {

Review Comment:
   let's sure make sure to align these with the names in the consumer tests if 
they change with the comments above



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##########
@@ -208,6 +207,35 @@ public void testSeekUnvalidatedEventWithException() {
         assertInstanceOf(IllegalStateException.class, e.getCause());
     }
 
+    @Test
+    public void testPollEvent() {
+        PollEvent event = new PollEvent(12345);
+
+        setupProcessor(true);
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        processor.process(event);
+        verify(commitRequestManager).updateAutoCommitTimer(12345);
+        verify(membershipManager).maybeJoinGroup();
+        verify(heartbeatRequestManager).resetPollTimer(12345);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testSubscriptionChangeEvent(boolean withGroupId) {

Review Comment:
   I would say this test only makes sense withGroupId=true right? if there is 
no groupId, the processor will have null HBMgr, MembershipMgr (there cannot be 
a `SubscriptionChange` event without a groupId, the api call to `subscribe` 
requires groupId)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to