dajac commented on code in PR #17549: URL: https://github.com/apache/kafka/pull/17549#discussion_r1822143292
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -198,14 +209,14 @@ public void testConsumerHeartbeatRequestValidation() { // SubscribedTopicNames must be present and empty in the first request (epoch == 0). ex = assertThrows(InvalidRequestException.class, () -> context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() + .setMemberId(memberId) .setGroupId("foo") .setMemberEpoch(0) .setRebalanceTimeoutMs(5000) .setTopicPartitions(Collections.emptyList()))); assertEquals("SubscribedTopicNames must be set in first request.", ex.getMessage()); - // MemberId must be non-empty in all requests except for the first one where it - // could be empty (epoch != 0). + // MemberId must be non-empty in all requests Review Comment: This test case seems redundant now. Should we remove it? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10204,8 +10225,16 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() { mkTopicAssignment(fooTopicId, 0))) .build(); - String newMemberId = result.response().memberId(); - ConsumerGroupMember expectedReplacingConsumerMember = new ConsumerGroupMember.Builder(newMemberId) + // The memberId is generated by the consumer and should be retained + // for the entire lifetime of the process until termination. + String serverReturnedMemberId = result.response().memberId(); + assertEquals( + newMemberId, + serverReturnedMemberId, + "Server should not generate a new memberId since the consumer has already generated its own." + ); Review Comment: nit: How about `assertEquals(newMemberId, result.response().memberId(), ...)`? We don't really need the `serverReturnedMemberId` local variable as we can directly use newMemberId. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10177,15 +10193,20 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() { context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments, metadataImage.features().metadataVersion())); context.commit(); - // The static member rejoins with new protocol, triggering the upgrade. + // The static member rejoins with new protocol after a restart, triggering the upgrade. + String newMemberId = Uuid.randomUuid().toString(); + assertNotEquals(newMemberId, memberId, "The consumer should generate a new memberId since the process has been terminated and restarted."); Review Comment: nit: I would remove this one. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -14086,23 +14133,28 @@ public void testShareGroupMemberIdGeneration() { Collections.emptyMap() )); + String memberId = Uuid.randomUuid().toString(); CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat( new ShareGroupHeartbeatRequestData() .setGroupId("group-foo") + .setMemberId(memberId) .setMemberEpoch(0) .setSubscribedTopicNames(Arrays.asList("foo", "bar"))); - // Verify that a member id was generated for the new member. - String memberId = result.response().memberId(); - assertNotNull(memberId); - assertNotEquals("", memberId); + String serverReturnedMemberId = result.response().memberId(); Review Comment: ditto. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -10367,14 +10392,26 @@ public void testConsumerGroupHeartbeatFromExistingClassicStaticMember() { CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result = context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() .setGroupId(groupId) + .setMemberId(memberId1) .setInstanceId(instanceId1) .setRebalanceTimeoutMs(5000) .setServerAssignor(NoOpPartitionAssignor.NAME) .setSubscribedTopicNames(new ArrayList<>(member1.subscribedTopicNames())) - .setTopicPartitions(Collections.emptyList())); + .setTopicPartitions(Collections.emptyList()), + ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion() + ); + + + // The memberId is generated by the consumer itself, the consumer should retain this memberId + // for its entire lifetime until the process terminates. + String serverReturnedMemberId = result.response().memberId(); Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org