smjn commented on code in PR #19339:
URL: https://github.com/apache/kafka/pull/19339#discussion_r2024714018


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4870,37 +4921,127 @@ public CoordinatorResult<Void, CoordinatorRecord> 
initializeShareGroupState(
         }
         ShareGroup group = (ShareGroup) groups.get(groupId);
 
-        // We must combine the existing information in the record with the
-        // topicPartitionMap argument.
-        Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
-
         ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
         if (currentMap == null) {
-            topicPartitionMap.forEach((k, v) -> finalMap.put(k, 
Map.entry(metadataImage.topics().getTopic(k).name(), v)));
             return new CoordinatorResult<>(
-                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, 
Map.of())),
+                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), 
attachTopicName(topicPartitionMap), Map.of())),
                 null
             );
         }
 
-        Set<Uuid> combinedTopicIdSet = new 
HashSet<>(topicPartitionMap.keySet());
-        combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
+        // We must combine the existing information in the record with the 
topicPartitionMap argument so that the final
+        // record has up-to-date information.
+        Map<Uuid, Set<Integer>> finalInitializedMap = 
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
 
-        for (Uuid topicId : combinedTopicIdSet) {
-            String topicName = metadataImage.topics().getTopic(topicId).name();
-            Set<Integer> partitions = new 
HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
-            if (topicPartitionMap.containsKey(topicId)) {
-                partitions.addAll(topicPartitionMap.get(topicId));
+        // Fetch initializing info from state metadata.
+        Map<Uuid, Set<Integer>> finalInitializingMap = new 
HashMap<>(currentMap.initializingTopics());
+
+        // Remove any entries which are already initialized.
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
topicPartitionMap.entrySet()) {
+            Uuid topicId = entry.getKey();
+            if (finalInitializingMap.containsKey(topicId)) {
+                Set<Integer> partitions = finalInitializingMap.get(topicId);
+                partitions.removeAll(entry.getValue());
+                if (partitions.isEmpty()) {
+                    finalInitializingMap.remove(topicId);
+                }
+            }
+        }
+
+        return new CoordinatorResult<>(List.of(
+            newShareGroupStatePartitionMetadataRecord(
+                group.groupId(),
+                attachTopicName(finalInitializingMap),
+                attachTopicName(finalInitializedMap),
+                Map.of()

Review Comment:
   This is a placeholder for now since deleting topics functionality depends on 
this PR. I will update in subsequent PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to