dajac commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1822143292


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -198,14 +209,14 @@ public void testConsumerHeartbeatRequestValidation() {
         // SubscribedTopicNames must be present and empty in the first request 
(epoch == 0).
         ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(memberId)
                 .setGroupId("foo")
                 .setMemberEpoch(0)
                 .setRebalanceTimeoutMs(5000)
                 .setTopicPartitions(Collections.emptyList())));
         assertEquals("SubscribedTopicNames must be set in first request.", 
ex.getMessage());
 
-        // MemberId must be non-empty in all requests except for the first one 
where it
-        // could be empty (epoch != 0).
+        // MemberId must be non-empty in all requests

Review Comment:
   This test case seems redundant now. Should we remove it?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10204,8 +10225,16 @@ public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
                 mkTopicAssignment(fooTopicId, 0)))
             .build();
 
-        String newMemberId = result.response().memberId();
-        ConsumerGroupMember expectedReplacingConsumerMember = new 
ConsumerGroupMember.Builder(newMemberId)
+        // The memberId is generated by the consumer and should be retained
+        // for the entire lifetime of the process until termination.
+        String serverReturnedMemberId = result.response().memberId();
+        assertEquals(
+            newMemberId,
+            serverReturnedMemberId,
+            "Server should not generate a new memberId since the consumer has 
already generated its own."
+        );

Review Comment:
   nit: How about `assertEquals(newMemberId, result.response().memberId(), 
...)`? We don't really need the `serverReturnedMemberId` local variable as we 
can directly use newMemberId.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10177,15 +10193,20 @@ public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
         
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, 
assignments, metadataImage.features().metadataVersion()));
         context.commit();
 
-        // The static member rejoins with new protocol, triggering the upgrade.
+        // The static member rejoins with new protocol after a restart, 
triggering the upgrade.
+        String newMemberId = Uuid.randomUuid().toString();
+        assertNotEquals(newMemberId, memberId, "The consumer should generate a 
new memberId since the process has been terminated and restarted.");

Review Comment:
   nit: I would remove this one.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14086,23 +14133,28 @@ public void testShareGroupMemberIdGeneration() {
             Collections.emptyMap()
         ));
 
+        String memberId = Uuid.randomUuid().toString();
         CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> 
result = context.shareGroupHeartbeat(
             new ShareGroupHeartbeatRequestData()
                 .setGroupId("group-foo")
+                .setMemberId(memberId)
                 .setMemberEpoch(0)
                 .setSubscribedTopicNames(Arrays.asList("foo", "bar")));
 
-        // Verify that a member id was generated for the new member.
-        String memberId = result.response().memberId();
-        assertNotNull(memberId);
-        assertNotEquals("", memberId);
+        String serverReturnedMemberId = result.response().memberId();

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10367,14 +10392,26 @@ public void 
testConsumerGroupHeartbeatFromExistingClassicStaticMember() {
         CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
+                .setMemberId(memberId1)
                 .setInstanceId(instanceId1)
                 .setRebalanceTimeoutMs(5000)
                 .setServerAssignor(NoOpPartitionAssignor.NAME)
                 .setSubscribedTopicNames(new 
ArrayList<>(member1.subscribedTopicNames()))
-                .setTopicPartitions(Collections.emptyList()));
+                .setTopicPartitions(Collections.emptyList()),
+            ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion()
+        );
+
+
+        // The memberId is generated by the consumer itself, the consumer 
should retain this memberId
+        // for its entire lifetime until the process terminates.
+        String serverReturnedMemberId = result.response().memberId();

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to