lucasbru commented on code in PR #19114:
URL: https://github.com/apache/kafka/pull/19114#discussion_r1987319040


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1480,6 +1600,34 @@ private boolean isSubset(
         return true;
     }
 
+    /**
+     * Verifies that the tasks currently owned by the member (the ones set in 
the
+     * request) matches the ones that the member should own. It matches if the 
streams
+     * only owns tasks which are in the assigned tasks. It does not match if
+     * it owns any other tasks.
+     *
+     * @param ownedTasks  The tasks provided by the streams in the request.
+     * @param target      The tasks that the member should have.
+     *
+     * @return A boolean indicating whether the owned partitions are a subset 
or not.
+     */
+    private boolean isTaskSubset(

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1480,6 +1600,34 @@ private boolean isSubset(
         return true;
     }
 
+    /**
+     * Verifies that the tasks currently owned by the member (the ones set in 
the
+     * request) matches the ones that the member should own. It matches if the 
streams
+     * only owns tasks which are in the assigned tasks. It does not match if
+     * it owns any other tasks.
+     *
+     * @param ownedTasks  The tasks provided by the streams in the request.
+     * @param target      The tasks that the member should have.
+     *
+     * @return A boolean indicating whether the owned partitions are a subset 
or not.
+     */
+    private boolean isTaskSubset(
+        List<StreamsGroupHeartbeatRequestData.TaskIds> ownedTasks,
+        Map<String, Set<Integer>> target
+    ) {
+        if (ownedTasks == null) return false;
+
+        for (StreamsGroupHeartbeatRequestData.TaskIds topicPartitions : 
ownedTasks) {

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1805,6 +1990,280 @@ private 
List<ShareGroupHeartbeatResponseData.TopicPartitions> fromShareGroupAssi
             .toList();
     }
 
+    /**
+     * Handles a regular heartbeat from a Streams group member.
+     * It mainly consists of five parts:
+     * 1) Created or update the member.
+     *    The group epoch is bumped if the member has been created or updated.
+     * 2) Initialized or update the topology. The group epoch is bumped if the 
topology

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to