AndrewJSchofield commented on code in PR #19781:
URL: https://github.com/apache/kafka/pull/19781#discussion_r2101919968


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2674,27 +2662,34 @@ private boolean initializedAssignmentPending(ShareGroup 
group) {
      * @return  A map of topic partitions which are subscribed by the share 
group but not initialized yet.
      */
     // Visibility for testing
-    Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId, 
Map<String, TopicMetadata> subscriptionMetadata) {
+    Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, 
Map<String, TopicMetadata> subscriptionMetadata) {
         if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
             return Map.of();
         }
 
-        Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>();
+        Map<Uuid, InitMapValue> topicPartitionChangeMap = new HashMap<>();
         ShareGroupStatePartitionMetadataInfo info = 
shareGroupPartitionMetadata.get(groupId);
 
-        // We are only considering initialized TPs here. This is because it 
could happen
-        // that some topics have been moved to initializing but the 
corresponding persister request
-        // could not be made/failed (invoked by the group coordinator). Then 
there would be no way to try
-        // the persister call. This way we get the opportunity to retry.
-        Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new 
HashMap<>() : info.initializedTopics();
+        // Fresh initializing only
+        long curTimestamp = time.milliseconds();
+        long delta = config.offsetCommitTimeoutMs() * 2L;
+        Map<Uuid, InitMapValue> alreadyInitialized = info == null ? new 
HashMap<>() :

Review Comment:
   I would add a comment here. The resulting map is a combination of 
initialized topics and initializing topics which have a timestamp within the 
delta.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2674,27 +2662,34 @@ private boolean initializedAssignmentPending(ShareGroup 
group) {
      * @return  A map of topic partitions which are subscribed by the share 
group but not initialized yet.
      */
     // Visibility for testing
-    Map<Uuid, Set<Integer>> subscribedTopicsChangeMap(String groupId, 
Map<String, TopicMetadata> subscriptionMetadata) {
+    Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, 
Map<String, TopicMetadata> subscriptionMetadata) {
         if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
             return Map.of();
         }
 
-        Map<Uuid, Set<Integer>> topicPartitionChangeMap = new HashMap<>();
+        Map<Uuid, InitMapValue> topicPartitionChangeMap = new HashMap<>();
         ShareGroupStatePartitionMetadataInfo info = 
shareGroupPartitionMetadata.get(groupId);
 
-        // We are only considering initialized TPs here. This is because it 
could happen
-        // that some topics have been moved to initializing but the 
corresponding persister request
-        // could not be made/failed (invoked by the group coordinator). Then 
there would be no way to try
-        // the persister call. This way we get the opportunity to retry.
-        Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? new 
HashMap<>() : info.initializedTopics();
+        // Fresh initializing only
+        long curTimestamp = time.milliseconds();
+        long delta = config.offsetCommitTimeoutMs() * 2L;
+        Map<Uuid, InitMapValue> alreadyInitialized = info == null ? new 
HashMap<>() :
+            combineInitMaps(
+                info.initializedTopics(),
+                info.initializingTopics().entrySet().stream()
+                    .filter(entry -> curTimestamp - 
entry.getValue().timestamp() < delta)
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue))
+            );
 
         subscriptionMetadata.forEach((topicName, topicMetadata) -> {
-            Set<Integer> alreadyInitializedPartSet = 
alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of());
+            Set<Integer> alreadyInitializedPartSet = 
alreadyInitialized.containsKey(topicMetadata.id()) ? 
alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of();
             if (alreadyInitializedPartSet.isEmpty() || 
alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) {
                 Set<Integer> partitionSet = IntStream.range(0, 
topicMetadata.numPartitions()).boxed().collect(Collectors.toSet());
                 partitionSet.removeAll(alreadyInitializedPartSet);
-
-                topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k 
-> partitionSet);
+                // alreadyInitialized contains all initialized topics and less 
than delta seconds old initializing topics

Review Comment:
   This is a bit misleading. The delta is actually milliseconds I think. I 
suggest not even referring to the units at all. The point is to avoid 
duplicating requests within the delta, giving the opportunity for retries but 
minimising the risk of collisions which cause state epoch fencing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to