lianetm commented on code in PR #17165: URL: https://github.com/apache/kafka/pull/17165#discussion_r1773915749
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1617,10 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable } @Test - public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { + public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() { Review Comment: there is no "shouldTransitionToJoining" var/concept anymore, so should we update this ? maybe simply `...DoesNotTransitionToJoiningIfInGroup` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -189,6 +191,12 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl private final Time time; + /** + * AtomicBoolean to track whether the subscription is updated. + * If it's true and subscription state is UNSUBSCRIBED, the next {@link #maybeJoinGroup()} will change member state to JOINING. + */ + private final AtomicBoolean hasSubscriptionUpdated = new AtomicBoolean(false); Review Comment: nit: what about simply `subscriptionUpdated` ...to track whether the subscription has been updated ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java: ########## @@ -1617,10 +1617,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable } @Test - public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { + public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() { ConsumerMembershipManager membershipManager = createMemberInStableState(); membershipManager.onSubscriptionUpdated(); + assertTrue(membershipManager.hasSubscriptionUpdated()); + membershipManager.maybeJoinGroup(); verify(membershipManager, never()).transitionToJoining(); + assertFalse(membershipManager.hasSubscriptionUpdated()); + } + + @Test + public void testOnSubscriptionUpdatedSetShouldTransitionToJoiningIfNotInGroup() { Review Comment: `testOnSubscriptionUpdatedTransitionsToJoiningIOnPollfNotInGroup`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java: ########## @@ -1137,10 +1137,25 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable } @Test - public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() { + public void testOnSubscriptionUpdatedDoesNotSetShouldTransitionToJoiningIfInGroup() { Review Comment: let's sure make sure to align these with the names in the consumer tests if they change with the comments above ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java: ########## @@ -208,6 +207,35 @@ public void testSeekUnvalidatedEventWithException() { assertInstanceOf(IllegalStateException.class, e.getCause()); } + @Test + public void testPollEvent() { + PollEvent event = new PollEvent(12345); + + setupProcessor(true); + when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + processor.process(event); + verify(commitRequestManager).updateAutoCommitTimer(12345); + verify(membershipManager).maybeJoinGroup(); + verify(heartbeatRequestManager).resetPollTimer(12345); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSubscriptionChangeEvent(boolean withGroupId) { Review Comment: I would say this test only makes sense withGroupId=true right? if there is no groupId, the processor will have null HBMgr, MembershipMgr (there cannot be a `SubscriptionChange` event without a groupId, the api call to `subscribe` requires groupId) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org