AndrewJSchofield commented on code in PR #19026:
URL: https://github.com/apache/kafka/pull/19026#discussion_r1973313677


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -463,6 +472,19 @@ GroupMetadataManager build() {
      */
     private final ShareGroupPartitionAssignor shareGroupAssignor;
 
+    /**
+     * A record class to hold the value representing 
ShareGroupStatePartitionMetadata for the TimelineHashmap
+     * keyed on share group id.
+     *
+     * @param initializedTopics Map of set of partition ids keyed on the topic 
id.
+     * @param deletingTopics    Map of topic names keyed on topic id.
+     */
+    private record ShareGroupStatePartitionMetadataInfo(

Review Comment:
   I don't understand why we need topic name in one case, but not the other.
   
   Option 1:
   ```
   Map<Uuid, Set<Integer> intializedTopics
   Set<Uuid> deletingTopics
   ```
   
   Option 2:
   ```
   Map<Uuid, Map.Entry<String, Set<Integer>> intializedTopics
   Map<Uuid, String> deletingTopics
   ```
   
   I suspect that option 1 would be sufficient. What am I missing?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2190,10 +2213,10 @@ private CoordinatorResult<Void, CoordinatorRecord> 
classicGroupJoinToConsumerGro
      * @param clientHost            The client host.
      * @param subscribedTopicNames  The list of subscribed topic names from 
the request or null.
      *
-     * @return A Result containing the ShareGroupHeartbeat response and
-     *         a list of records to update the state machine.
+     * @return A Result containing a pair of ShareGroupHeartbeat response and 
maybe InitializeShareGroupStateParameters
+     *         and a list of records to update the state machine.
      */
-    private CoordinatorResult<ShareGroupHeartbeatResponseData, 
CoordinatorRecord> shareGroupHeartbeat(
+    private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, 
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> 
shareGroupHeartbeat(

Review Comment:
   I am tempted to create a `Pair<K,V>` in the common module. There are other 
places I've seen where we need a pair. Absolutely no need for you to address 
that in this PR. I understand what you're doing here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4741,6 +4889,50 @@ public void replay(
         }
     }
 
+    /**
+     * Replays ShareGroupStatePartitionMetadataKey/Value to update the hard 
state of
+     * the share group.
+     *
+     * @param key   A ShareGroupStatePartitionMetadataKey key.
+     * @param value A ShareGroupStatePartitionMetadataValue record.
+     */
+    public void replay(
+        ShareGroupStatePartitionMetadataKey key,
+        ShareGroupStatePartitionMetadataValue value
+    ) {
+        String groupId = key.groupId();
+
+        getOrMaybeCreatePersistedShareGroup(groupId, false);
+
+        // Update timeline structures with info about initialized/deleted 
topics.
+        if (value == null) {
+            // Tombstone!
+            shareGroupPartitionMetadata.remove(groupId);
+        } else {
+            // Init java record.
+            ShareGroupStatePartitionMetadataInfo record = 
shareGroupPartitionMetadata.computeIfAbsent(
+                groupId, k -> new ShareGroupStatePartitionMetadataInfo(new 
HashMap<>(), new HashMap<>())
+            );
+
+            // Remove all topicIds in deleting state from java record.
+            Map<Uuid, String> deleting = new HashMap<>();
+            for (ShareGroupStatePartitionMetadataValue.TopicInfo deletingTopic 
: value.deletingTopics()) {
+                deleting.put(deletingTopic.topicId(), 
deletingTopic.topicName());
+            }
+            deleting.forEach((tId, tName) -> 
record.initializedTopics.remove(tId));

Review Comment:
   Looks to me like the initialized topics list is not initialized yet.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -1066,6 +1086,13 @@ public void replay(
                 );
                 break;
 
+            case SHARE_GROUP_STATE_PARTITION_METADATA:

Review Comment:
   Please move this case to follow `SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3981,6 +4105,30 @@ public 
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sha
             request.subscribedTopicNames());
     }
 
+    /**
+     * Handles an initialize share group state request. This is usually part of
+     * shareGroupHeartbeat code flow.
+     * @param groupId The group id corresponding to the share group whose 
share partitions have been initialized.
+     * @param topicPartitionMap Map representing topic partition data to be 
added to the share state partition metadata.
+     *
+     * @return A Result containing ShareGroupStatePartitionMetadata records 
and Void response.
+     */
+    public CoordinatorResult<Void, CoordinatorRecord> 
initializeShareGroupState(
+        String groupId,
+        Map<Uuid, Map.Entry<String, List<Integer>>> topicPartitionMap
+    ) {
+        // Should be present
+        ShareGroup group = (ShareGroup) groups.get(groupId);
+        if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
+            return new CoordinatorResult<>(List.of(), null);
+        }
+
+        return new CoordinatorResult<>(
+            List.of(newShareGroupPartitionMetadataRecord(group.groupId(), 
topicPartitionMap, Map.of())),

Review Comment:
   This is incorrect in the case where the number of partitions has increased. 
Let's say that the T1 has increased from 1 to 4 partitions. The 
`topicPartitionMap` will contain `{topicId, {"T1", [1,2,3]}}`. However, the 
`ShareGroupPartitionMetadataRecord` should contain the complete list of 
partitions, not just the new ones. Because the `__consumer_offsets` topic is 
compacted and these records are keyed by group ID, only one such record can be 
expected to remain present. As a result, the most recent record needs to 
contain the most up-to-date list of all initialized topic-partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to