AndrewJSchofield commented on code in PR #19781: URL: https://github.com/apache/kafka/pull/19781#discussion_r2101919968
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2674,27 +2662,34 @@ private boolean initializedAssignmentPending(ShareGroup group) { * @return A map of topic partitions which are subscribed by the share group but not initialized yet. */ // Visibility for testing - Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) { + Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) { if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) { return Map.of(); } - Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>(); + Map<Uuid, InitMapValue> topicPartitionChangeMap = new HashMap<>(); ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId); - // We are only considering initialized TPs here. This is because it could happen - // that some topics have been moved to initializing but the corresponding persister request - // could not be made/failed (invoked by the group coordinator). Then there would be no way to try - // the persister call. This way we get the opportunity to retry. - Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new HashMap<>() : info.initializedTopics(); + // Fresh initializing only + long curTimestamp = time.milliseconds(); + long delta = config.offsetCommitTimeoutMs() * 2L; + Map<Uuid, InitMapValue> alreadyInitialized = info == null ? new HashMap<>() : Review Comment: I would add a comment here. The resulting map is a combination of initialized topics and initializing topics which have a timestamp within the delta. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2674,27 +2662,34 @@ private boolean initializedAssignmentPending(ShareGroup group) { * @return A map of topic partitions which are subscribed by the share group but not initialized yet. */ // Visibility for testing - Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) { + Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) { if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) { return Map.of(); } - Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>(); + Map<Uuid, InitMapValue> topicPartitionChangeMap = new HashMap<>(); ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId); - // We are only considering initialized TPs here. This is because it could happen - // that some topics have been moved to initializing but the corresponding persister request - // could not be made/failed (invoked by the group coordinator). Then there would be no way to try - // the persister call. This way we get the opportunity to retry. - Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new HashMap<>() : info.initializedTopics(); + // Fresh initializing only + long curTimestamp = time.milliseconds(); + long delta = config.offsetCommitTimeoutMs() * 2L; + Map<Uuid, InitMapValue> alreadyInitialized = info == null ? new HashMap<>() : + combineInitMaps( + info.initializedTopics(), + info.initializingTopics().entrySet().stream() + .filter(entry -> curTimestamp - entry.getValue().timestamp() < delta) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ); subscriptionMetadata.forEach((topicName, topicMetadata) -> { - Set<Integer> alreadyInitializedPartSet = alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of()); + Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicMetadata.id()) ? alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of(); if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) { Set<Integer> partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet()); partitionSet.removeAll(alreadyInitializedPartSet); - - topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> partitionSet); + // alreadyInitialized contains all initialized topics and less than delta seconds old initializing topics Review Comment: This is a bit misleading. The delta is actually milliseconds I think. I suggest not even referring to the units at all. The point is to avoid duplicating requests within the delta, giving the opportunity for retries but minimising the risk of collisions which cause state epoch fencing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org