lucasbru commented on code in PR #19114: URL: https://github.com/apache/kafka/pull/19114#discussion_r1998207274
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3338,9 +3342,9 @@ private boolean hasMemberSubscriptionChanged( } /** - * Creates the member subscription record if the updatedMember is different from - * the old member. Returns true if the topologyEpoch of the member has changed, - * which is always true when a member is first created. + * Creates the member metadatarecord record if the updatedMember is different from + * the old member. Returns true if the metadata has changed, which is always true Review Comment: Done ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3338,9 +3342,9 @@ private boolean hasMemberSubscriptionChanged( } /** - * Creates the member subscription record if the updatedMember is different from - * the old member. Returns true if the topologyEpoch of the member has changed, - * which is always true when a member is first created. + * Creates the member metadatarecord record if the updatedMember is different from Review Comment: As mentioned below, we somewhat changed the behavior of this method in a corner case. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3358,12 +3362,10 @@ private boolean hasStreamsMemberMetadataChanged( String memberId = updatedMember.memberId(); if (!updatedMember.equals(member)) { records.add(newStreamsGroupMemberRecord(groupId, updatedMember)); + log.info("[GroupId {}] Member {} updated its member metdata to {}.", + groupId, memberId, updatedMember); - if (!Objects.equals(updatedMember.topologyEpoch(), member.topologyEpoch())) { Review Comment: I moved the "return true" outside the if. There was a discussion with Bruno about this. The thing is, that there is a bunch of information - rackId, clientTags, ... that in theory, may influence the assignment, so we need to bump the group epoch / rerun the assignor - in practice, a member will not change these during its lifetime, so they only change when a member joins, which anyway, will cause a group epoch bump. However, since we do not force the member to never change those configs, we need to be conservative here and bump the group epoch any time any of those metadata fields changes (not just when teh topology epoch changes, as it was implemented before). ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -15555,6 +15575,100 @@ public void testStreamsUpdatingMetadataTriggersNewTargetAssignment() { List<CoordinatorRecord> expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), + StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, + TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), + TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) + )), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 11), + StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testStreamsUpdatingPartitonMetadataTriggersNewTargetAssignment() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org