bbejeck commented on code in PR #19407: URL: https://github.com/apache/kafka/pull/19407#discussion_r2031311815
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -513,6 +513,8 @@ GroupMetadataManager build() { */ private final int streamsGroupMetadataRefreshIntervalMs; + private final Set<String> streamsGroupMembersSendMetadata = new HashSet<>(); + Review Comment: Introducing soft-state to track members that have received endpointToPartitions information in a heartbeat response ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2341,15 +2343,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream ); scheduleStreamsGroupSessionTimeout(groupId, memberId); - List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitions = maybeBuildEndpointToPartitions(group); // Prepare the response. StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData() .setMemberId(updatedMember.memberId()) .setMemberEpoch(updatedMember.memberEpoch()) - .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs) - .setPartitionsByUserEndpoint(endpointToPartitions); - - // The assignment is only provided in the following cases: + .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs); + Review Comment: Removed this line that resulted in sending the endpoint information in every heartbeat response. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2358,9 +2358,18 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream || hasAssignedStandbyTasksChanged(member, updatedMember) || hasAssignedWarmupTasksChanged(member, updatedMember) ) { + streamsGroupMembersSendMetadata.clear(); Review Comment: There's been an assignment change, so all members will need to eventually get an update to the endpoint metadata. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2358,9 +2358,18 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream || hasAssignedStandbyTasksChanged(member, updatedMember) || hasAssignedWarmupTasksChanged(member, updatedMember) ) { + streamsGroupMembersSendMetadata.clear(); response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedActiveTasks())); response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedStandbyTasks())); response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedWarmupTasks())); + } else { + long memberGroupEpochCount = group.members().values().stream().filter(m -> m.memberEpoch() == group.groupEpoch()).count(); + if (memberGroupEpochCount == group.members().size()) { + if (!streamsGroupMembersSendMetadata.contains(updatedMember.memberId())) { + response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group)); + streamsGroupMembersSendMetadata.add(updatedMember.memberId()); + } Review Comment: If all members have a `memberEpoch` equaling the `groupEpoch` and the `memberId` is not in the sent metadata set, add the endpoint information to the heartbeat response and add the id to the set to prevent redundant updates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org