smjn commented on code in PR #19781:
URL: https://github.com/apache/kafka/pull/19781#discussion_r2102752894


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2771,33 +2766,50 @@ void addInitializingTopicsRecords(String groupId, 
List<CoordinatorRecord> record
         records.add(
             newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                attachTopicName(finalInitializingMap),
-                attachTopicName(currentMap.initializedTopics()),
+                finalInitializingMap,
+                currentMap.initializedTopics(),
                 attachTopicName(currentDeleting)
             )
         );
     }
 
-    // Visibility for tests
-    static Map<Uuid, Set<Integer>> mergeShareGroupInitMaps(
-        Map<Uuid, Set<Integer>> existingShareGroupInitMap,
-        Map<Uuid, Set<Integer>> newShareGroupInitMap
+    // Visibility for testing
+    static Map<Uuid, InitMapValue> combineInitMaps(
+        Map<Uuid, InitMapValue> initialized,
+        Map<Uuid, InitMapValue> initializing
     ) {
-        Map<Uuid, Set<Integer>> finalInitMap = new HashMap<>();
-        Set<Uuid> combinedTopicIdSet = new 
HashSet<>(existingShareGroupInitMap.keySet());
-        combinedTopicIdSet.addAll(newShareGroupInitMap.keySet());
+        Map<Uuid, InitMapValue> finalInitMap = new HashMap<>();
+        Set<Uuid> combinedTopicIdSet = new HashSet<>(initialized.keySet());
+
+        Set<Uuid> initializingSet = initializing.keySet();
+
+        combinedTopicIdSet.addAll(initializingSet);
 
         for (Uuid topicId : combinedTopicIdSet) {
-            Set<Integer> partitions = new 
HashSet<>(existingShareGroupInitMap.getOrDefault(topicId, new HashSet<>()));
-            if (newShareGroupInitMap.containsKey(topicId)) {
-                partitions.addAll(newShareGroupInitMap.get(topicId));
+            Set<Integer> initializedPartitions = 
initialized.containsKey(topicId) ? initialized.get(topicId).partitions() : new 
HashSet<>();
+            long timestamp = initialized.containsKey(topicId) ? 
initialized.get(topicId).timestamp() : -1;
+            String name = initialized.containsKey(topicId) ? 
initialized.get(topicId).name() : "UNKNOWN";
+
+            Set<Integer> finalPartitions = new 
HashSet<>(initializedPartitions);
+            if (initializingSet.contains(topicId)) {
+                finalPartitions.addAll(initializing.get(topicId).partitions());
+                timestamp = initializing.get(topicId).timestamp();

Review Comment:
   @AndrewJSchofield 
   The rationale behind this was the cases where combineIntMaps is used:
   
   1. All initialized and initializing (only fresh ones)- when computing the 
change map to build the request, so the resultant will have a fresh timestamp 
but the code which uses the result does not care about the timestamp. It 
creates a new map which has all topics which are old initializing (not part of 
the combined map) and first time seen subscribed topics and uses the current 
timestamp in the new map. This is then used to build the init request.
   2. When building the request with some topic partitions, we write the 
metadata record with updated info containing current initializing topics (per 
last replay) and fresh ones as output of step 1. All the partitions will again 
be updated per the latest timestamp BUT this is the initializing map which 
means that none of the partitions have been assigned hence, older partition 
initialize might get delayed but will not cause corruption.
   3. when we mark partitions as initialized here the inputs are current 
initialized and fresh ones being initialized and the timestamps do not matter 
here.
   4. when building delete request - we are combining currently deleting topics 
and any new topics from initializing or initialized which need to be deleted - 
timestamps do not matter here too.
   
   Hence, one timestamp per topic was added.
   
   If we want further fine grain control ans case 2 above seems disagreeable - 
we will need to add timestamp per partition. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to