bbejeck commented on code in PR #19407:
URL: https://github.com/apache/kafka/pull/19407#discussion_r2031311815


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -513,6 +513,8 @@ GroupMetadataManager build() {
      */
     private final int streamsGroupMetadataRefreshIntervalMs;
 
+    private final Set<String> streamsGroupMembersSendMetadata = new 
HashSet<>();
+

Review Comment:
   Introducing soft-state to track members that have received 
endpointToPartitions information in a heartbeat response 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2341,15 +2343,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         );
 
         scheduleStreamsGroupSessionTimeout(groupId, memberId);
-        List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitions = maybeBuildEndpointToPartitions(group);
         // Prepare the response.
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs)
-            .setPartitionsByUserEndpoint(endpointToPartitions);
-
-        // The assignment is only provided in the following cases:
+            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs);
+        

Review Comment:
   Removed this line that resulted in sending the endpoint information in every 
heartbeat response.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2358,9 +2358,18 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
             || hasAssignedStandbyTasksChanged(member, updatedMember)
             || hasAssignedWarmupTasksChanged(member, updatedMember)
         ) {
+            streamsGroupMembersSendMetadata.clear();

Review Comment:
   There's been an assignment change, so all members will need to eventually 
get an update to the endpoint metadata.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2358,9 +2358,18 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
             || hasAssignedStandbyTasksChanged(member, updatedMember)
             || hasAssignedWarmupTasksChanged(member, updatedMember)
         ) {
+            streamsGroupMembersSendMetadata.clear();
             
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedActiveTasks()));
             
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedStandbyTasks()));
             
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedWarmupTasks()));
+        } else {
+            long memberGroupEpochCount = 
group.members().values().stream().filter(m -> m.memberEpoch() == 
group.groupEpoch()).count();
+            if (memberGroupEpochCount == group.members().size()) {
+                if 
(!streamsGroupMembersSendMetadata.contains(updatedMember.memberId())) {
+                    
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+                    
streamsGroupMembersSendMetadata.add(updatedMember.memberId());
+                }

Review Comment:
   If all members have a `memberEpoch` equaling the `groupEpoch` and the 
`memberId` is not in the sent metadata set, add the endpoint information to the 
heartbeat response and add the id to the set to prevent redundant updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to