lucasbru commented on code in PR #19114:
URL: https://github.com/apache/kafka/pull/19114#discussion_r1998207274


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3338,9 +3342,9 @@ private boolean hasMemberSubscriptionChanged(
     }
 
     /**
-     * Creates the member subscription record if the updatedMember is 
different from
-     * the old member. Returns true if the topologyEpoch of the member has 
changed,
-     * which is always true when a member is first created.
+     * Creates the member metadatarecord record if the updatedMember is 
different from
+     * the old member. Returns true if the metadata has changed, which is 
always true

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3338,9 +3342,9 @@ private boolean hasMemberSubscriptionChanged(
     }
 
     /**
-     * Creates the member subscription record if the updatedMember is 
different from
-     * the old member. Returns true if the topologyEpoch of the member has 
changed,
-     * which is always true when a member is first created.
+     * Creates the member metadatarecord record if the updatedMember is 
different from

Review Comment:
   As mentioned below, we somewhat changed the behavior of this method in a 
corner case.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3358,12 +3362,10 @@ private boolean hasStreamsMemberMetadataChanged(
         String memberId = updatedMember.memberId();
         if (!updatedMember.equals(member)) {
             records.add(newStreamsGroupMemberRecord(groupId, updatedMember));
+            log.info("[GroupId {}] Member {} updated its member metdata to 
{}.",
+                groupId, memberId, updatedMember);
 
-            if (!Objects.equals(updatedMember.topologyEpoch(), 
member.topologyEpoch())) {

Review Comment:
   I moved the "return true" outside the if. There was a discussion with Bruno 
about this. The thing is, that there is a bunch of information - rackId, 
clientTags, ... that in theory, may influence the assignment, so we need to 
bump the group epoch / rerun the assignor - in practice, a member will not 
change these during its lifetime, so they only change when a member joins, 
which anyway, will cause a group epoch bump. However, since we do not force the 
member to never change those configs, we need to be conservative here and bump 
the group epoch any time any of those metadata fields changes (not just when 
teh topology epoch changes, as it was implemented before).



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15555,6 +15575,100 @@ public void 
testStreamsUpdatingMetadataTriggersNewTargetAssignment() {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
+                TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
+                    TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
+                )),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 11),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void 
testStreamsUpdatingPartitonMetadataTriggersNewTargetAssignment() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to