ankitsultana commented on code in PR #15843:
URL: https://github.com/apache/pinot/pull/15843#discussion_r2114836787


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,149 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   *
+   * @return A pair of maps, where the first map contains the segments that 
are assigned to a server and the second
+   * map contains the segments that are optional (i.e., the server is not 
online to serve that segment).
+   * Example:
+   * {
+   *   "required_segments": {
+   *     "segment1": "server1",
+   *     "segment2": "server2"
+   *   },
+   *   "optional_segments": {
+   *     "segment3": "server3",
+   *     "segment4": "server4"
+   *   }
+   * }
+   */
+  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(Set<String> segments,
+    SegmentStates segmentStates, InstancePartitions instancePartitions, int 
preferredReplicaId) {
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null && !candidates.isEmpty(),
+        "Failed to find servers for segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        getSelectedInstancesForPartition(segmentToSelectedInstanceMap, 
instanceToSegmentsMap, requiredSegments,
+          partition, preferredReplicaId);
+      }
+    }
+
+    return computeOptionalSegments(segmentToSelectedInstanceMap, 
segmentStates);
   }
 
   /**
-   * Returns a map from the segmentName to the corresponding server in the 
given replica-group. If the is not enabled,
-   * we throw an exception.
+   * This method selects the instances for the given partition and segments. 
It iterates over the replica groups
+   * in a round-robin fashion, starting from the preferred replica group. It 
only selects the replica group if all
+   * the segments are available in the selected instances.
+   * If no replica group is found, it throws an exception.
    */
-  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(List<String> segments,
-      SegmentStates segmentStates, InstancePartitions instancePartitions, int 
replicaId) {
-    Set<String> instanceLookUpSet = new HashSet<>();
-    for (int partition = 0; partition < instancePartitions.getNumPartitions(); 
partition++) {
-      List<String> instances = instancePartitions.getInstances(partition, 
replicaId);
-      instanceLookUpSet.addAll(instances);
+  private void getSelectedInstancesForPartition(Map<String, String> 
segmentToSelectedInstanceMap,
+    Map<String, Set<String>> instanceToSegmentsMap, Set<String> 
requiredSegments, int partitionId,
+    int preferredReplicaId) {
+    int numReplicaGroups = _instancePartitions.getNumReplicaGroups();
+
+    for (int i = 0; i < numReplicaGroups; i += 1) {
+      int selectedReplicaGroup = (i + preferredReplicaId) % numReplicaGroups;
+      List<String> selectedInstances = 
_instancePartitions.getInstances(partitionId, selectedReplicaGroup);
+
+      Set<String> segmentsFromSelectedInstances = new HashSet<>();
+      for (String instance : selectedInstances) {
+        Set<String> servedSegments = instanceToSegmentsMap.get(instance);
+        if (servedSegments != null) {
+          segmentsFromSelectedInstances.addAll(servedSegments);
+        }
+      }
+
+      if (segmentsFromSelectedInstances.containsAll(requiredSegments)) {
+        for (String instance: selectedInstances) {
+          Set<String> servedSegments = instanceToSegmentsMap.get(instance);
+          if (servedSegments == null || servedSegments.isEmpty()) {
+            continue;
+          }
+          for (String segment : servedSegments) {
+            if (requiredSegments.contains(segment)) {
+              segmentToSelectedInstanceMap.put(segment, instance);
+            }
+          }
+        }
+        return; // Successfully selected instances for the partition.
+      }
     }
-    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+
+    throw new RuntimeException(
+      String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+  }
+
+  /**
+   * Based on whether the selected instance for the segment is online to serve 
that segment or not,
+   * this method computes the segments that are optional and the segments that 
are not.
+   */
+  private Pair<Map<String, String>, Map<String, String>> 
computeOptionalSegments(
+    Map<String, String> segmentToSelectedInstanceMap,
+    SegmentStates segmentStates) {

Review Comment:
   nit: Pinot style. The second arg should be on the same line as the previous 
arg (we only wrap lines when we run out of space).
   
   (also I see this issue in some other places too.. like L137)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,149 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   *
+   * @return A pair of maps, where the first map contains the segments that 
are assigned to a server and the second
+   * map contains the segments that are optional (i.e., the server is not 
online to serve that segment).
+   * Example:
+   * {
+   *   "required_segments": {
+   *     "segment1": "server1",
+   *     "segment2": "server2"
+   *   },
+   *   "optional_segments": {
+   *     "segment3": "server3",
+   *     "segment4": "server4"
+   *   }
+   * }
+   */
+  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(Set<String> segments,
+    SegmentStates segmentStates, InstancePartitions instancePartitions, int 
preferredReplicaId) {
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null && !candidates.isEmpty(),
+        "Failed to find servers for segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        getSelectedInstancesForPartition(segmentToSelectedInstanceMap, 
instanceToSegmentsMap, requiredSegments,
+          partition, preferredReplicaId);
+      }
+    }
+
+    return computeOptionalSegments(segmentToSelectedInstanceMap, 
segmentStates);
   }
 
   /**
-   * Returns a map from the segmentName to the corresponding server in the 
given replica-group. If the is not enabled,
-   * we throw an exception.
+   * This method selects the instances for the given partition and segments. 
It iterates over the replica groups
+   * in a round-robin fashion, starting from the preferred replica group. It 
only selects the replica group if all
+   * the segments are available in the selected instances.
+   * If no replica group is found, it throws an exception.
    */
-  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(List<String> segments,
-      SegmentStates segmentStates, InstancePartitions instancePartitions, int 
replicaId) {
-    Set<String> instanceLookUpSet = new HashSet<>();
-    for (int partition = 0; partition < instancePartitions.getNumPartitions(); 
partition++) {
-      List<String> instances = instancePartitions.getInstances(partition, 
replicaId);
-      instanceLookUpSet.addAll(instances);
+  private void getSelectedInstancesForPartition(Map<String, String> 
segmentToSelectedInstanceMap,
+    Map<String, Set<String>> instanceToSegmentsMap, Set<String> 
requiredSegments, int partitionId,
+    int preferredReplicaId) {
+    int numReplicaGroups = _instancePartitions.getNumReplicaGroups();
+
+    for (int i = 0; i < numReplicaGroups; i += 1) {
+      int selectedReplicaGroup = (i + preferredReplicaId) % numReplicaGroups;
+      List<String> selectedInstances = 
_instancePartitions.getInstances(partitionId, selectedReplicaGroup);
+
+      Set<String> segmentsFromSelectedInstances = new HashSet<>();
+      for (String instance : selectedInstances) {
+        Set<String> servedSegments = instanceToSegmentsMap.get(instance);
+        if (servedSegments != null) {
+          segmentsFromSelectedInstances.addAll(servedSegments);
+        }
+      }
+
+      if (segmentsFromSelectedInstances.containsAll(requiredSegments)) {
+        for (String instance: selectedInstances) {
+          Set<String> servedSegments = instanceToSegmentsMap.get(instance);
+          if (servedSegments == null || servedSegments.isEmpty()) {
+            continue;
+          }
+          for (String segment : servedSegments) {
+            if (requiredSegments.contains(segment)) {
+              segmentToSelectedInstanceMap.put(segment, instance);
+            }
+          }
+        }
+        return; // Successfully selected instances for the partition.
+      }
     }
-    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+
+    throw new RuntimeException(
+      String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+  }
+
+  /**
+   * Based on whether the selected instance for the segment is online to serve 
that segment or not,
+   * this method computes the segments that are optional and the segments that 
are not.
+   */
+  private Pair<Map<String, String>, Map<String, String>> 
computeOptionalSegments(
+    Map<String, String> segmentToSelectedInstanceMap,
+    SegmentStates segmentStates) {
+
+    Map<String, String> segmentsToInstanceMap = new HashMap<>();
     Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
-    for (String segment : segments) {
+
+    for (Map.Entry<String, String> entry : 
segmentToSelectedInstanceMap.entrySet()) {
+      String segment = entry.getKey();
+      String selectedInstance = entry.getValue();
+
       List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
       // If candidates are null, we will throw an exception and log a warning.
-      Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
-      boolean found = false;
-      // candidates array is always sorted
+      Preconditions.checkState(candidates != null && !candidates.isEmpty(),
+        "Failed to find servers for segment: %s", segment);
+
       for (SegmentInstanceCandidate candidate : candidates) {
-        String instance = candidate.getInstance();
-        if (instanceLookUpSet.contains(instance)) {
-          found = true;
-          // This can only be offline when it is a new segment. And such 
segment is marked as optional segment so that
-          // broker or server can skip it upon any issue to process it.
+        if (selectedInstance.equals(candidate.getInstance())) {
           if (candidate.isOnline()) {

Review Comment:
   nit: can you copy over this comment to the new code too?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,149 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   *
+   * @return A pair of maps, where the first map contains the segments that 
are assigned to a server and the second
+   * map contains the segments that are optional (i.e., the server is not 
online to serve that segment).
+   * Example:
+   * {
+   *   "required_segments": {
+   *     "segment1": "server1",
+   *     "segment2": "server2"
+   *   },
+   *   "optional_segments": {
+   *     "segment3": "server3",
+   *     "segment4": "server4"
+   *   }
+   * }
+   */
+  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(Set<String> segments,

Review Comment:
   nit: the `try` prefix no longer makes sense, since this is a 1-shot attempt.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,149 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   *
+   * @return A pair of maps, where the first map contains the segments that 
are assigned to a server and the second
+   * map contains the segments that are optional (i.e., the server is not 
online to serve that segment).
+   * Example:
+   * {
+   *   "required_segments": {
+   *     "segment1": "server1",
+   *     "segment2": "server2"
+   *   },
+   *   "optional_segments": {
+   *     "segment3": "server3",
+   *     "segment4": "server4"
+   *   }
+   * }
+   */
+  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(Set<String> segments,
+    SegmentStates segmentStates, InstancePartitions instancePartitions, int 
preferredReplicaId) {
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null && !candidates.isEmpty(),
+        "Failed to find servers for segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        getSelectedInstancesForPartition(segmentToSelectedInstanceMap, 
instanceToSegmentsMap, requiredSegments,
+          partition, preferredReplicaId);
+      }
+    }
+
+    return computeOptionalSegments(segmentToSelectedInstanceMap, 
segmentStates);
   }
 
   /**
-   * Returns a map from the segmentName to the corresponding server in the 
given replica-group. If the is not enabled,
-   * we throw an exception.
+   * This method selects the instances for the given partition and segments. 
It iterates over the replica groups
+   * in a round-robin fashion, starting from the preferred replica group. It 
only selects the replica group if all
+   * the segments are available in the selected instances.
+   * If no replica group is found, it throws an exception.
    */
-  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(List<String> segments,
-      SegmentStates segmentStates, InstancePartitions instancePartitions, int 
replicaId) {
-    Set<String> instanceLookUpSet = new HashSet<>();
-    for (int partition = 0; partition < instancePartitions.getNumPartitions(); 
partition++) {
-      List<String> instances = instancePartitions.getInstances(partition, 
replicaId);
-      instanceLookUpSet.addAll(instances);
+  private void getSelectedInstancesForPartition(Map<String, String> 
segmentToSelectedInstanceMap,
+    Map<String, Set<String>> instanceToSegmentsMap, Set<String> 
requiredSegments, int partitionId,
+    int preferredReplicaId) {
+    int numReplicaGroups = _instancePartitions.getNumReplicaGroups();
+
+    for (int i = 0; i < numReplicaGroups; i += 1) {

Review Comment:
   nit: rename `i` to `replicaGroupOffset`. Also `i += 1` ==> `i++`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,149 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   *
+   * @return A pair of maps, where the first map contains the segments that 
are assigned to a server and the second
+   * map contains the segments that are optional (i.e., the server is not 
online to serve that segment).
+   * Example:
+   * {
+   *   "required_segments": {
+   *     "segment1": "server1",
+   *     "segment2": "server2"
+   *   },
+   *   "optional_segments": {
+   *     "segment3": "server3",
+   *     "segment4": "server4"
+   *   }
+   * }
+   */
+  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(Set<String> segments,
+    SegmentStates segmentStates, InstancePartitions instancePartitions, int 
preferredReplicaId) {
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null && !candidates.isEmpty(),
+        "Failed to find servers for segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        getSelectedInstancesForPartition(segmentToSelectedInstanceMap, 
instanceToSegmentsMap, requiredSegments,
+          partition, preferredReplicaId);
+      }
+    }
+
+    return computeOptionalSegments(segmentToSelectedInstanceMap, 
segmentStates);
   }
 
   /**
-   * Returns a map from the segmentName to the corresponding server in the 
given replica-group. If the is not enabled,
-   * we throw an exception.
+   * This method selects the instances for the given partition and segments. 
It iterates over the replica groups
+   * in a round-robin fashion, starting from the preferred replica group. It 
only selects the replica group if all
+   * the segments are available in the selected instances.
+   * If no replica group is found, it throws an exception.
    */
-  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(List<String> segments,
-      SegmentStates segmentStates, InstancePartitions instancePartitions, int 
replicaId) {
-    Set<String> instanceLookUpSet = new HashSet<>();
-    for (int partition = 0; partition < instancePartitions.getNumPartitions(); 
partition++) {
-      List<String> instances = instancePartitions.getInstances(partition, 
replicaId);
-      instanceLookUpSet.addAll(instances);
+  private void getSelectedInstancesForPartition(Map<String, String> 
segmentToSelectedInstanceMap,

Review Comment:
   nit (opinionated): a common pattern is that if we pass a mutable object to a 
method, it is put at the end of the arg-list and suffixed with `Sink`.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java:
##########
@@ -99,58 +100,149 @@ Pair<Map<String, String>, Map<String, String>> 
select(List<String> segments, int
     } else {
       replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
     }
-    for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
-      int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
-      try {
-        return tryAssigning(segments, segmentStates, instancePartitions, 
replicaGroup);
-      } catch (Exception e) {
-        LOGGER.warn("Unable to select replica-group {} for table: {}", 
replicaGroup, _tableNameWithType, e);
+
+    return tryAssigning(Sets.newHashSet(segments), segmentStates, 
instancePartitions, replicaGroupSelected);
+  }
+
+  /**
+   * Returns a map from the segmentName to the corresponding server. It tries 
to select all servers from the
+   * preferredReplicaGroup, but if it fails, it will try to select the 
relevant server from other instance partitions.
+   *
+   * @return A pair of maps, where the first map contains the segments that 
are assigned to a server and the second
+   * map contains the segments that are optional (i.e., the server is not 
online to serve that segment).
+   * Example:
+   * {
+   *   "required_segments": {
+   *     "segment1": "server1",
+   *     "segment2": "server2"
+   *   },
+   *   "optional_segments": {
+   *     "segment3": "server3",
+   *     "segment4": "server4"
+   *   }
+   * }
+   */
+  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(Set<String> segments,
+    SegmentStates segmentStates, InstancePartitions instancePartitions, int 
preferredReplicaId) {
+    Map<String, Integer> instanceToPartitionMap = 
instancePartitions.getInstanceToPartitionIdMap();
+    Map<String, Set<String>> instanceToSegmentsMap = new HashMap<>();
+
+    // instanceToSegmentsMap stores the mapping from instance to the active 
segments it can serve.
+    for (String segment : segments) {
+      List<SegmentInstanceCandidate> candidates = 
segmentStates.getCandidates(segment);
+      Preconditions.checkState(candidates != null && !candidates.isEmpty(),
+        "Failed to find servers for segment: %s", segment);
+      for (SegmentInstanceCandidate candidate : candidates) {
+        instanceToSegmentsMap
+          .computeIfAbsent(candidate.getInstance(), k -> new HashSet<>())
+          .add(segment);
       }
     }
-    throw new RuntimeException(
-        String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));
+
+    // partitionToRequiredSegmentsMap stores the mapping from partition to the 
segments that need to be served. This
+    // is necessary to select appropriate replica group at the instance 
partition level.
+    Map<Integer, Set<String>> partitionToRequiredSegmentsMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      Integer partitionId = instanceToPartitionMap.get(entry.getKey());
+      partitionToRequiredSegmentsMap
+        .computeIfAbsent(partitionId, k -> new HashSet<>())
+        .addAll(entry.getValue());
+    }
+
+    // Assign segments to instances based on the 
partitionToRequiredSegmentsMap. This ensures that we select the
+    // appropriate replica group for each set of segments belonging to the 
same instance partition.
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Set<String> requiredSegments = 
partitionToRequiredSegmentsMap.get(partition);
+      if (requiredSegments != null) {
+        getSelectedInstancesForPartition(segmentToSelectedInstanceMap, 
instanceToSegmentsMap, requiredSegments,
+          partition, preferredReplicaId);
+      }
+    }
+
+    return computeOptionalSegments(segmentToSelectedInstanceMap, 
segmentStates);
   }
 
   /**
-   * Returns a map from the segmentName to the corresponding server in the 
given replica-group. If the is not enabled,
-   * we throw an exception.
+   * This method selects the instances for the given partition and segments. 
It iterates over the replica groups
+   * in a round-robin fashion, starting from the preferred replica group. It 
only selects the replica group if all
+   * the segments are available in the selected instances.
+   * If no replica group is found, it throws an exception.
    */
-  private Pair<Map<String, String>, Map<String, String>> 
tryAssigning(List<String> segments,
-      SegmentStates segmentStates, InstancePartitions instancePartitions, int 
replicaId) {
-    Set<String> instanceLookUpSet = new HashSet<>();
-    for (int partition = 0; partition < instancePartitions.getNumPartitions(); 
partition++) {
-      List<String> instances = instancePartitions.getInstances(partition, 
replicaId);
-      instanceLookUpSet.addAll(instances);
+  private void getSelectedInstancesForPartition(Map<String, String> 
segmentToSelectedInstanceMap,
+    Map<String, Set<String>> instanceToSegmentsMap, Set<String> 
requiredSegments, int partitionId,
+    int preferredReplicaId) {
+    int numReplicaGroups = _instancePartitions.getNumReplicaGroups();
+
+    for (int i = 0; i < numReplicaGroups; i += 1) {
+      int selectedReplicaGroup = (i + preferredReplicaId) % numReplicaGroups;
+      List<String> selectedInstances = 
_instancePartitions.getInstances(partitionId, selectedReplicaGroup);
+
+      Set<String> segmentsFromSelectedInstances = new HashSet<>();
+      for (String instance : selectedInstances) {
+        Set<String> servedSegments = instanceToSegmentsMap.get(instance);
+        if (servedSegments != null) {
+          segmentsFromSelectedInstances.addAll(servedSegments);
+        }
+      }
+
+      if (segmentsFromSelectedInstances.containsAll(requiredSegments)) {
+        for (String instance: selectedInstances) {
+          Set<String> servedSegments = instanceToSegmentsMap.get(instance);
+          if (servedSegments == null || servedSegments.isEmpty()) {
+            continue;
+          }
+          for (String segment : servedSegments) {
+            if (requiredSegments.contains(segment)) {
+              segmentToSelectedInstanceMap.put(segment, instance);
+            }
+          }
+        }
+        return; // Successfully selected instances for the partition.
+      }
     }
-    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+
+    throw new RuntimeException(
+      String.format("Unable to find any replica-group to serve table: %s", 
_tableNameWithType));

Review Comment:
   let's call out the `partitionId` here. e.g.
   
   ```
   "Unable to find a replica-group to serve segments in the instance-partition 
%s for table %s" 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to