smjn commented on code in PR #19781: URL: https://github.com/apache/kafka/pull/19781#discussion_r2102752894
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2771,33 +2766,50 @@ void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> record records.add( newShareGroupStatePartitionMetadataRecord( groupId, - attachTopicName(finalInitializingMap), - attachTopicName(currentMap.initializedTopics()), + finalInitializingMap, + currentMap.initializedTopics(), attachTopicName(currentDeleting) ) ); } - // Visibility for tests - static Map<Uuid, Set<Integer>> mergeShareGroupInitMaps( - Map<Uuid, Set<Integer>> existingShareGroupInitMap, - Map<Uuid, Set<Integer>> newShareGroupInitMap + // Visibility for testing + static Map<Uuid, InitMapValue> combineInitMaps( + Map<Uuid, InitMapValue> initialized, + Map<Uuid, InitMapValue> initializing ) { - Map<Uuid, Set<Integer>> finalInitMap = new HashMap<>(); - Set<Uuid> combinedTopicIdSet = new HashSet<>(existingShareGroupInitMap.keySet()); - combinedTopicIdSet.addAll(newShareGroupInitMap.keySet()); + Map<Uuid, InitMapValue> finalInitMap = new HashMap<>(); + Set<Uuid> combinedTopicIdSet = new HashSet<>(initialized.keySet()); + + Set<Uuid> initializingSet = initializing.keySet(); + + combinedTopicIdSet.addAll(initializingSet); for (Uuid topicId : combinedTopicIdSet) { - Set<Integer> partitions = new HashSet<>(existingShareGroupInitMap.getOrDefault(topicId, new HashSet<>())); - if (newShareGroupInitMap.containsKey(topicId)) { - partitions.addAll(newShareGroupInitMap.get(topicId)); + Set<Integer> initializedPartitions = initialized.containsKey(topicId) ? initialized.get(topicId).partitions() : new HashSet<>(); + long timestamp = initialized.containsKey(topicId) ? initialized.get(topicId).timestamp() : -1; + String name = initialized.containsKey(topicId) ? initialized.get(topicId).name() : "UNKNOWN"; + + Set<Integer> finalPartitions = new HashSet<>(initializedPartitions); + if (initializingSet.contains(topicId)) { + finalPartitions.addAll(initializing.get(topicId).partitions()); + timestamp = initializing.get(topicId).timestamp(); Review Comment: @AndrewJSchofield The rationale behind this was the cases where combineIntMaps is used: 1. All initialized and initializing (only fresh ones)- when computing the change map to build the initialize request, so the resultant will have a fresh timestamp but the code which uses the result does not care about the timestamp. It creates a new map which has all topics which are old initializing (not part of the combined map) and first time seen subscribed topics and uses the current timestamp in the new map. This is then used to build the init request. 2. When building the request with some topic partitions, we write the metadata record with updated info containing current initializing topics (per last replay) and ones as output of step 1. a. If any older initializing partitions existed, they are already part of output of step 1. All the partitions will again be updated per the latest timestamp BUT this is the initializing map which means that none of the partitions have been assigned. b. If any partitions less that delta existed prior to adding new partitions, these will be updated to latest timestamp. Then when delta elapses, they could be retried. They could be slightly delayed, but since delta is relatively small - it should work. 3. when we mark partitions as initialized here the inputs are current initialized and fresh ones being initialized and the timestamps do not matter here. 4. when building delete request - we are combining currently deleting topics and any new topics from initializing or initialized which need to be deleted - timestamps do not matter here too. Hence, one timestamp per topic was added. If we want further fine grain control ans case 2 above seems disagreeable - we will need to add timestamp per partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org