smjn commented on code in PR #19339:
URL: https://github.com/apache/kafka/pull/19339#discussion_r2024712166


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4870,37 +4921,127 @@ public CoordinatorResult<Void, CoordinatorRecord> 
initializeShareGroupState(
         }
         ShareGroup group = (ShareGroup) groups.get(groupId);
 
-        // We must combine the existing information in the record with the
-        // topicPartitionMap argument.
-        Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
-
         ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
         if (currentMap == null) {
-            topicPartitionMap.forEach((k, v) -> finalMap.put(k, 
Map.entry(metadataImage.topics().getTopic(k).name(), v)));
             return new CoordinatorResult<>(
-                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, 
Map.of())),
+                
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), 
attachTopicName(topicPartitionMap), Map.of())),
                 null
             );
         }
 
-        Set<Uuid> combinedTopicIdSet = new 
HashSet<>(topicPartitionMap.keySet());
-        combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
+        // We must combine the existing information in the record with the 
topicPartitionMap argument so that the final
+        // record has up-to-date information.
+        Map<Uuid, Set<Integer>> finalInitializedMap = 
mergeShareGroupInitMaps(currentMap.initializedTopics(), topicPartitionMap);
 
-        for (Uuid topicId : combinedTopicIdSet) {
-            String topicName = metadataImage.topics().getTopic(topicId).name();
-            Set<Integer> partitions = new 
HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
-            if (topicPartitionMap.containsKey(topicId)) {
-                partitions.addAll(topicPartitionMap.get(topicId));
+        // Fetch initializing info from state metadata.
+        Map<Uuid, Set<Integer>> finalInitializingMap = new 
HashMap<>(currentMap.initializingTopics());
+
+        // Remove any entries which are already initialized.
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
topicPartitionMap.entrySet()) {
+            Uuid topicId = entry.getKey();
+            if (finalInitializingMap.containsKey(topicId)) {
+                Set<Integer> partitions = finalInitializingMap.get(topicId);
+                partitions.removeAll(entry.getValue());
+                if (partitions.isEmpty()) {
+                    finalInitializingMap.remove(topicId);
+                }
+            }
+        }
+
+        return new CoordinatorResult<>(List.of(
+            newShareGroupStatePartitionMetadataRecord(
+                group.groupId(),
+                attachTopicName(finalInitializingMap),
+                attachTopicName(finalInitializedMap),
+                Map.of()
+            )),
+            null
+        );
+    }
+
+    /**
+     * Removes specific topic partitions from the initializing state for a 
share group. This is usually part of
+     * shareGroupHeartbeat code flow, specifically, if there is a persister 
exception.
+     * @param groupId The group id corresponding to the share group whose 
share partitions have been initialized.
+     * @param topicPartitionMap Map representing topic partition data to be 
cleaned from the share state partition metadata.
+     *
+     * @return A Result containing ShareGroupStatePartitionMetadata records 
and Void response.
+     */
+    public CoordinatorResult<Void, CoordinatorRecord> 
uninitializeShareGroupState(
+        String groupId,
+        Map<Uuid, Set<Integer>> topicPartitionMap
+    ) {
+        ShareGroupStatePartitionMetadataInfo info = 
shareGroupPartitionMetadata.get(groupId);
+        if (info == null || info.initializingTopics().isEmpty() || 
topicPartitionMap.isEmpty()) {
+            return new CoordinatorResult<>(List.of(), null);
+        }
+
+        Map<Uuid, Set<Integer>> initializingTopics = info.initializingTopics();
+        Map<Uuid, Set<Integer>> finalInitializingTopics = new HashMap<>();
+
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
initializingTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            // If topicId to clean is not present in topicPartitionMap map, 
retain it.
+            if (!topicPartitionMap.containsKey(topicId)) {
+                finalInitializingTopics.put(entry.getKey(), entry.getValue());
+            } else {
+                Set<Integer> partitions = new HashSet<>(entry.getValue());
+                partitions.removeAll(topicPartitionMap.get(topicId));
+                if (!partitions.isEmpty()) {
+                    finalInitializingTopics.put(entry.getKey(), partitions);
+                }
             }
-            finalMap.computeIfAbsent(topicId, k -> Map.entry(topicName, 
partitions));
         }
 
         return new CoordinatorResult<>(
-            List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), 
finalMap, Map.of())),
+            List.of(
+                newShareGroupStatePartitionMetadataRecord(
+                    groupId,
+                    attachTopicName(finalInitializingTopics),
+                    attachTopicName(info.initializedTopics()),
+                    Map.of()
+                )
+            ),
             null
         );
     }
 
+    /**
+     * Iterates over all share groups and returns persister initialize 
requests corresponding to any initializing
+     * topic partitions found in the group associated {@link 
ShareGroupStatePartitionMetadataInfo}.
+     * @param offset The last committed offset for the {@link 
ShareGroupStatePartitionMetadataInfo} timeline hashmap.
+     *
+     * @return A list containing {@link InitializeShareGroupStateParameters} 
requests, could be empty.
+     */
+    public List<InitializeShareGroupStateParameters> 
reconcileShareGroupStateInitializingState(long offset) {
+        List<InitializeShareGroupStateParameters> requests = new 
LinkedList<>();
+        for (Group group : groups.values()) {
+            if (!(group instanceof ShareGroup shareGroup)) {
+                continue;
+            }
+            if 
(!(shareGroupPartitionMetadata.containsKey(shareGroup.groupId()))) {
+                continue;
+            }
+            Map<Uuid, Set<Integer>> initializing = 
shareGroupPartitionMetadata.get(shareGroup.groupId(), 
offset).initializingTopics();
+            if (initializing == null || initializing.isEmpty()) {
+                continue;
+            }
+            
requests.add(buildInitializeShareGroupStateRequest(shareGroup.groupId(), 
shareGroup.groupEpoch(), initializing));

Review Comment:
   So, this is called in onElected flow which will complete before serving any 
requests. The last committed group epoch should work here. But yes in case of 
network partitions it can happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to