This is an automated email from the ASF dual-hosted git repository.

saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 640ebe52cf Sticky query routing via query options (#12276)
640ebe52cf is described below

commit 640ebe52cf19922530b67f354126d007e3b650a2
Author: Saurabh Dubey <[email protected]>
AuthorDate: Sat Jan 20 17:07:19 2024 +0530

    Sticky query routing via query options (#12276)
    
    * Support sticky routing via query options
    
    ---------
    
    Co-authored-by: Saurabh Dubey <[email protected]>
---
 .../pinot/broker/routing/BrokerRoutingManager.java |   2 +-
 .../AdaptiveServerSelectorFactory.java             |   2 +
 .../instanceselector/BalancedInstanceSelector.java |  14 ++-
 .../instanceselector/BaseInstanceSelector.java     |  23 ++++-
 .../instanceselector/InstanceSelectorFactory.java  |  30 ++++--
 .../MultiStageReplicaGroupSelector.java            |  18 +++-
 .../ReplicaGroupInstanceSelector.java              |  15 ++-
 .../StrictReplicaGroupInstanceSelector.java        |   5 +-
 .../instanceselector/InstanceSelectorTest.java     |  55 ++++++-----
 .../common/utils/config/QueryOptionsUtils.java     |   6 ++
 .../common/utils/config/TableConfigSerDeTest.java  |   2 +-
 .../tests/BaseClusterIntegrationTest.java          |  20 ++--
 ...PartitionLLCRealtimeClusterIntegrationTest.java |   2 +-
 .../segment/local/utils/TableConfigUtilsTest.java  | 103 +++++++++++----------
 .../pinot/spi/config/table/RoutingConfig.java      |  10 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 16 files changed, 202 insertions(+), 109 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index cc3a5354ef..9031e87580 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -446,7 +446,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
         
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
 _pinotConfig);
     InstanceSelector instanceSelector =
         InstanceSelectorFactory.getInstanceSelector(tableConfig, 
_propertyStore, _brokerMetrics,
-            adaptiveServerSelector);
+            adaptiveServerSelector, _pinotConfig);
     instanceSelector.init(_routableServers, idealState, externalView, 
preSelectedOnlineSegments);
 
     // Add time boundary manager if both offline and real-time part exist for 
a hybrid table
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
index a97c562abf..ae83cfca69 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.broker.routing.adaptiveserverselector;
 
 import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
@@ -35,6 +36,7 @@ public class AdaptiveServerSelectorFactory {
   private AdaptiveServerSelectorFactory() {
   }
 
+  @Nullable
   public static AdaptiveServerSelector 
getAdaptiveServerSelector(ServerRoutingStatsManager serverRoutingStatsManager,
       PinotConfiguration pinotConfig) {
     boolean enableStatsCollection =
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index 77b5389fd4..c827369907 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -31,7 +31,6 @@ import 
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSele
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.HashUtil;
 
-
 /**
  * Instance selector to balance the number of segments served by each selected 
server instance.
  * <p>If AdaptiveServerSelection is enabled, the request is routed to the best 
available server for a segment
@@ -50,8 +49,9 @@ import org.apache.pinot.common.utils.HashUtil;
 public class BalancedInstanceSelector extends BaseInstanceSelector {
 
   public BalancedInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock);
+      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
+      boolean useFixedReplica) {
+    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica);
   }
 
   @Override
@@ -89,7 +89,13 @@ public class BalancedInstanceSelector extends 
BaseInstanceSelector {
         if (candidates == null) {
           continue;
         }
-        int selectedIdx = requestId++ % candidates.size();
+        int selectedIdx;
+        if (isUseFixedReplica(queryOptions)) {
+          // candidates array is always sorted
+          selectedIdx = _tableNameHashForFixedReplicaRouting % 
candidates.size();
+        } else {
+          selectedIdx = requestId++ % candidates.size();
+        }
         SegmentInstanceCandidate selectedCandidate = 
candidates.get(selectedIdx);
         // This can only be offline when it is a new segment. And such segment 
is marked as optional segment so that
         // broker or server can skip it upon any issue to process it.
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index b2961eef94..3cef77fac4 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -45,7 +45,9 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,6 +89,8 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   final BrokerMetrics _brokerMetrics;
   final AdaptiveServerSelector _adaptiveServerSelector;
   final Clock _clock;
+  final boolean _useFixedReplica;
+  final int _tableNameHashForFixedReplicaRouting;
 
   // These 3 variables are the cached states to help accelerate the change 
processing
   Set<String> _enabledInstances;
@@ -99,12 +103,24 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
   private volatile SegmentStates _segmentStates;
 
   BaseInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock) {
+      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
+      boolean useFixedReplica) {
     _tableNameWithType = tableNameWithType;
     _propertyStore = propertyStore;
     _brokerMetrics = brokerMetrics;
     _adaptiveServerSelector = adaptiveServerSelector;
     _clock = clock;
+    _useFixedReplica = useFixedReplica;
+    // Using raw table name to ensure queries spanning across REALTIME and 
OFFLINE tables are routed to the same
+    // instance
+    // Math.abs(Integer.MIN_VALUE) = Integer.MIN_VALUE, so we use & 0x7FFFFFFF 
to get a positive value
+    _tableNameHashForFixedReplicaRouting =
+        TableNameBuilder.extractRawTableName(tableNameWithType).hashCode() & 
0x7FFFFFFF;
+
+    if (_adaptiveServerSelector != null && _useFixedReplica) {
+      throw new IllegalArgumentException(
+          "AdaptiveServerSelector and consistent routing cannot be enabled at 
the same time");
+    }
   }
 
   @Override
@@ -429,6 +445,11 @@ abstract class BaseInstanceSelector implements 
InstanceSelector {
     }
   }
 
+  protected boolean isUseFixedReplica(Map<String, String> queryOptions) {
+    Boolean queryOption = QueryOptionsUtils.isUseFixedReplica(queryOptions);
+    return queryOption != null ? queryOption : _useFixedReplica;
+  }
+
   @Override
   public Set<String> getServingInstances() {
     return _segmentStates.getServingInstances();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 2428df02e5..2ccb1a9ac1 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.broker.routing.instanceselector;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.time.Clock;
 import javax.annotation.Nullable;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -27,6 +28,8 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,44 +43,53 @@ public class InstanceSelectorFactory {
   public static final String LEGACY_REPLICA_GROUP_OFFLINE_ROUTING = 
"PartitionAwareOffline";
   public static final String LEGACY_REPLICA_GROUP_REALTIME_ROUTING = 
"PartitionAwareRealtime";
 
+  @VisibleForTesting
   public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
-      ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics) {
-    return getInstanceSelector(tableConfig, propertyStore, brokerMetrics, 
null, Clock.systemUTC());
+      ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics, PinotConfiguration brokerConfig) {
+    return getInstanceSelector(tableConfig, propertyStore, brokerMetrics, 
null, Clock.systemUTC(), brokerConfig);
   }
 
   public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
       ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector) {
-    return getInstanceSelector(tableConfig, propertyStore, brokerMetrics, 
adaptiveServerSelector, Clock.systemUTC());
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, 
PinotConfiguration brokerConfig) {
+    return getInstanceSelector(tableConfig, propertyStore, brokerMetrics, 
adaptiveServerSelector, Clock.systemUTC(),
+        brokerConfig);
   }
 
   public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
       ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics 
brokerMetrics,
-      @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock) {
+      @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, 
PinotConfiguration brokerConfig) {
     String tableNameWithType = tableConfig.getTableName();
     RoutingConfig routingConfig = tableConfig.getRoutingConfig();
+    boolean useFixedReplica = 
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_USE_FIXED_REPLICA,
+        CommonConstants.Broker.DEFAULT_USE_FIXED_REPLICA);
     if (routingConfig != null) {
+      if (routingConfig.getUseFixedReplica() != null) {
+        // table config overrides broker config
+        useFixedReplica = routingConfig.getUseFixedReplica();
+      }
       if 
(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
           || (tableConfig.getTableType() == TableType.OFFLINE && 
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING.equalsIgnoreCase(
           routingConfig.getRoutingTableBuilderName())) || 
(tableConfig.getTableType() == TableType.REALTIME
           && 
LEGACY_REPLICA_GROUP_REALTIME_ROUTING.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())))
 {
         LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
         return new ReplicaGroupInstanceSelector(tableNameWithType, 
propertyStore, brokerMetrics, adaptiveServerSelector,
-            clock);
+            clock, useFixedReplica);
       }
       if 
(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
           routingConfig.getInstanceSelectorType())) {
         LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", 
tableNameWithType);
         return new StrictReplicaGroupInstanceSelector(tableNameWithType, 
propertyStore, brokerMetrics,
-            adaptiveServerSelector, clock);
+            adaptiveServerSelector, clock, useFixedReplica);
       }
       if 
(RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
           routingConfig.getInstanceSelectorType())) {
         LOGGER.info("Using {} for table: {}", 
routingConfig.getInstanceSelectorType(), tableNameWithType);
         return new MultiStageReplicaGroupSelector(tableNameWithType, 
propertyStore, brokerMetrics,
-            adaptiveServerSelector, clock);
+            adaptiveServerSelector, clock, useFixedReplica);
       }
     }
-    return new BalancedInstanceSelector(tableNameWithType, propertyStore, 
brokerMetrics, adaptiveServerSelector, clock);
+    return new BalancedInstanceSelector(tableNameWithType, propertyStore, 
brokerMetrics, adaptiveServerSelector, clock,
+        useFixedReplica);
   }
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index 15fb525a8c..b27450426d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -58,8 +58,9 @@ public class MultiStageReplicaGroupSelector extends 
BaseInstanceSelector {
   private volatile InstancePartitions _instancePartitions;
 
   public MultiStageReplicaGroupSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock);
+      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
+      boolean useFixedReplica) {
+    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica);
   }
 
   @Override
@@ -86,7 +87,17 @@ public class MultiStageReplicaGroupSelector extends 
BaseInstanceSelector {
       SegmentStates segmentStates, Map<String, String> queryOptions) {
     // Create a copy of InstancePartitions to avoid race-condition with 
event-listeners above.
     InstancePartitions instancePartitions = _instancePartitions;
-    int replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
+    int replicaGroupSelected;
+    if (isUseFixedReplica(queryOptions)) {
+      // When using sticky routing, we want to iterate over the 
instancePartitions in order to ensure deterministic
+      // selection of replica group across queries i.e. same instance replica 
group id is picked each time.
+      // Since the instances within a selected replica group are iterated in 
order, the assignment within a selected
+      // replica group is guaranteed to be deterministic.
+      // Note: This can cause major hotspots in the cluster.
+      replicaGroupSelected = 0;
+    } else {
+      replicaGroupSelected = requestId % 
instancePartitions.getNumReplicaGroups();
+    }
     for (int iteration = 0; iteration < 
instancePartitions.getNumReplicaGroups(); iteration++) {
       int replicaGroup = (replicaGroupSelected + iteration) % 
instancePartitions.getNumReplicaGroups();
       try {
@@ -117,6 +128,7 @@ public class MultiStageReplicaGroupSelector extends 
BaseInstanceSelector {
       // If candidates are null, we will throw an exception and log a warning.
       Preconditions.checkState(candidates != null, "Failed to find servers for 
segment: %s", segment);
       boolean found = false;
+      // candidates array is always sorted
       for (SegmentInstanceCandidate candidate : candidates) {
         String instance = candidate.getInstance();
         if (instanceLookUpSet.contains(instance)) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index 3683ca46bb..0e9bd52d42 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -64,8 +64,9 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
 
   public ReplicaGroupInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock);
+      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
+      boolean useFixedReplica) {
+    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica);
   }
 
   @Override
@@ -107,7 +108,15 @@ public class ReplicaGroupInstanceSelector extends 
BaseInstanceSelector {
       }
       // Round robin selection.
       int numCandidates = candidates.size();
-      int instanceIdx = (requestId + replicaOffset) % numCandidates;
+      int instanceIdx;
+
+      if (isUseFixedReplica(queryOptions)) {
+        // candidates array is always sorted
+        instanceIdx = _tableNameHashForFixedReplicaRouting % numCandidates;
+      } else {
+        instanceIdx = (requestId + replicaOffset) % numCandidates;
+      }
+
       SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
       // This can only be offline when it is a new segment. And such segment 
is marked as optional segment so that
       // broker or server can skip it upon any issue to process it.
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 8c352bdbe6..95e83ea31c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -71,8 +71,9 @@ public class StrictReplicaGroupInstanceSelector extends 
ReplicaGroupInstanceSele
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StrictReplicaGroupInstanceSelector.class);
 
   public StrictReplicaGroupInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<ZNRecord> propertyStore,
-      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock) {
-    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock);
+      BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector 
adaptiveServerSelector, Clock clock,
+      boolean useFixedReplica) {
+    super(tableNameWithType, propertyStore, brokerMetrics, 
adaptiveServerSelector, clock, useFixedReplica);
   }
 
   /**
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index c748be4885..9dc29eba15 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -51,6 +51,7 @@ import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterMethod;
@@ -156,10 +157,10 @@ public class InstanceSelectorTest {
   }
 
   private InstanceSelector createTestInstanceSelector(String selectorType) {
-    RoutingConfig config = new RoutingConfig(null, null, selectorType);
+    RoutingConfig config = new RoutingConfig(null, null, selectorType, false);
     when(_tableConfig.getRoutingConfig()).thenReturn(config);
     return InstanceSelectorFactory.getInstanceSelector(_tableConfig, 
_propertyStore, _brokerMetrics, null,
-        _mutableClock);
+        _mutableClock, new PinotConfiguration());
   }
 
   @DataProvider(name = "selectorType")
@@ -191,39 +192,40 @@ public class InstanceSelectorTest {
     TableConfig tableConfig = mock(TableConfig.class);
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+    when(tableConfig.getTableName()).thenReturn("testTable_OFFLINE");
 
     // Routing config is missing
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore,
-        brokerMetrics) instanceof BalancedInstanceSelector);
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
+        new PinotConfiguration()) instanceof BalancedInstanceSelector);
 
     // Instance selector type is not configured
     RoutingConfig routingConfig = mock(RoutingConfig.class);
     when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore,
-        brokerMetrics) instanceof BalancedInstanceSelector);
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
+        new PinotConfiguration()) instanceof BalancedInstanceSelector);
 
     // Replica-group instance selector should be returned
     
when(routingConfig.getInstanceSelectorType()).thenReturn(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore,
-        brokerMetrics) instanceof ReplicaGroupInstanceSelector);
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
+        new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
 
     // Strict replica-group instance selector should be returned
     
when(routingConfig.getInstanceSelectorType()).thenReturn(STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore,
-        brokerMetrics) instanceof StrictReplicaGroupInstanceSelector);
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
+        new PinotConfiguration()) instanceof 
StrictReplicaGroupInstanceSelector);
 
     // Should be backward-compatible with legacy config
     when(routingConfig.getInstanceSelectorType()).thenReturn(null);
     when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
     when(routingConfig.getRoutingTableBuilderName()).thenReturn(
         InstanceSelectorFactory.LEGACY_REPLICA_GROUP_OFFLINE_ROUTING);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore,
-        brokerMetrics) instanceof ReplicaGroupInstanceSelector);
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
+        new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
     when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
     when(routingConfig.getRoutingTableBuilderName()).thenReturn(
         InstanceSelectorFactory.LEGACY_REPLICA_GROUP_REALTIME_ROUTING);
-    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore,
-        brokerMetrics) instanceof ReplicaGroupInstanceSelector);
+    assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, 
propertyStore, brokerMetrics,
+        new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
   }
 
   @Test
@@ -232,11 +234,13 @@ public class InstanceSelectorTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     BalancedInstanceSelector balancedInstanceSelector =
-        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC());
+        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(), false);
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC());
+        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+            false);
     StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
-        new StrictReplicaGroupInstanceSelector(offlineTableName, 
propertyStore, brokerMetrics, null, Clock.systemUTC());
+        new StrictReplicaGroupInstanceSelector(offlineTableName, 
propertyStore, brokerMetrics, null, Clock.systemUTC(),
+            false);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -756,7 +760,8 @@ public class InstanceSelectorTest {
     when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
 
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC());
+        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+            false);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -838,7 +843,8 @@ public class InstanceSelectorTest {
     when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
 
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC());
+        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+            false);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -920,7 +926,8 @@ public class InstanceSelectorTest {
     when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
 
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
-        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC());
+        new ReplicaGroupInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+            false);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
@@ -993,7 +1000,8 @@ public class InstanceSelectorTest {
     when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
 
     MultiStageReplicaGroupSelector multiStageSelector =
-        new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC());
+        new MultiStageReplicaGroupSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(),
+            false);
     multiStageSelector = spy(multiStageSelector);
     
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
 
@@ -1088,10 +1096,11 @@ public class InstanceSelectorTest {
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     BalancedInstanceSelector balancedInstanceSelector =
-        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC());
+        new BalancedInstanceSelector(offlineTableName, propertyStore, 
brokerMetrics, null, Clock.systemUTC(), false);
     // ReplicaGroupInstanceSelector has the same behavior as 
BalancedInstanceSelector for the unavailable segments
     StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
-        new StrictReplicaGroupInstanceSelector(offlineTableName, 
propertyStore, brokerMetrics, null, Clock.systemUTC());
+        new StrictReplicaGroupInstanceSelector(offlineTableName, 
propertyStore, brokerMetrics, null, Clock.systemUTC(),
+            false);
 
     Set<String> enabledInstances = new HashSet<>();
     IdealState idealState = new IdealState(offlineTableName);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 8d93184c35..f31b41b63f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -145,6 +145,12 @@ public class QueryOptionsUtils {
     return 
"false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
   }
 
+  @Nullable
+  public static Boolean isUseFixedReplica(Map<String, String> queryOptions) {
+    String useFixedReplica = 
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.USE_FIXED_REPLICA);
+    return useFixedReplica != null ? Boolean.parseBoolean(useFixedReplica) : 
null;
+  }
+
   @Nullable
   public static Integer getNumReplicaGroupsToQuery(Map<String, String> 
queryOptions) {
     String numReplicaGroupsToQuery = 
queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 5a56d9e884..ed9d605af0 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -176,7 +176,7 @@ public class TableConfigSerDeTest {
     {
       // With routing config
       RoutingConfig routingConfig =
-          new RoutingConfig("builder", Arrays.asList("pruner0", "pruner1", 
"pruner2"), "selector");
+          new RoutingConfig("builder", Arrays.asList("pruner0", "pruner1", 
"pruner2"), "selector", false);
       TableConfig tableConfig = 
tableConfigBuilder.setRoutingConfig(routingConfig).build();
 
       checkRoutingConfig(tableConfig);
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 25ff663951..1a4b9691ac 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -369,8 +369,8 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
         
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
         
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-        
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled())
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
         .setUpsertConfig(upsertConfig).build();
@@ -416,13 +416,13 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         kafkaTopicName);
     streamConfigsMap.putAll(streamDecoderProperties);
 
-    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-        
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
-        
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
-        
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-        .setStreamConfigs(streamConfigsMap)
+    return new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+        
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+        
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setStreamConfigs(streamConfigsMap)
         
.setNullHandlingEnabled(UpsertConfig.Mode.PARTIAL.equals(upsertConfig.getMode())
 || getNullHandlingEnabled())
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
         .setUpsertConfig(upsertConfig).build();
@@ -440,8 +440,8 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
         
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
         
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
         
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-        
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled())
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
         .setDedupConfig(new DedupConfig(true, HashFunction.NONE)).build();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index b7e2ac077a..4b0c4e6c14 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -93,7 +93,7 @@ public class 
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
     indexingConfig.setSegmentPartitionConfig(new SegmentPartitionConfig(
         Collections.singletonMap(PARTITION_COLUMN, new 
ColumnPartitionConfig("murmur", 2))));
     tableConfig.setRoutingConfig(
-        new RoutingConfig(null, 
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null));
+        new RoutingConfig(null, 
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null, 
false));
     addTableConfig(tableConfig);
 
     // Push data into Kafka (only ingest the first Avro file)
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 22aff329d7..0c510add00 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1609,15 +1609,15 @@ public class TableConfigUtilsTest {
         Collections.singletonList(
             new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, 
"myCol").toColumnName()), null, 10);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setDedupConfig(new DedupConfig(true, HashFunction.NONE))
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setDedupConfig(new DedupConfig(true, 
HashFunction.NONE)).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
     TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
 
     // Dedup and upsert can't be enabled simultaneously
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
-        .setDedupConfig(new DedupConfig(true, HashFunction.NONE))
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setDedupConfig(new DedupConfig(true, 
HashFunction.NONE)).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1668,7 +1668,8 @@ public class TableConfigUtilsTest {
     // invalid tag override with upsert
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
         .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs())
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setTagOverrideConfig(new TagOverrideConfig("T1_REALTIME", 
"T2_REALTIME")).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1680,19 +1681,22 @@ public class TableConfigUtilsTest {
     // valid tag override with upsert
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
         .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs())
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setTagOverrideConfig(new TagOverrideConfig("T1_REALTIME", 
"T1_REALTIME")).build();
     TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
 
     // empty tag override with upsert should pass
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
         .setUpsertConfig(new 
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs())
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setTagOverrideConfig(new TagOverrideConfig(null, null)).build();
     TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
 
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setStreamConfigs(streamConfigs).build();
     TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
 
@@ -1700,7 +1704,8 @@ public class TableConfigUtilsTest {
         Collections.singletonList(
             new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, 
"myCol").toColumnName()), null, 10);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1711,7 +1716,8 @@ public class TableConfigUtilsTest {
 
     //With Aggregate Metrics
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setStreamConfigs(streamConfigs).setAggregateMetrics(true).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1724,7 +1730,8 @@ public class TableConfigUtilsTest {
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("twiceSum", "SUM(twice)")));
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         
.setStreamConfigs(streamConfigs).setIngestionConfig(ingestionConfig).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1735,7 +1742,8 @@ public class TableConfigUtilsTest {
 
     //With aggregation Configs in Ingestion Config and IndexingConfig at the 
same time
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         
.setStreamConfigs(streamConfigs).setAggregateMetrics(true).setIngestionConfig(ingestionConfig).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1763,8 +1771,8 @@ public class TableConfigUtilsTest {
     upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setDeleteRecordColumn(stringTypeDelCol);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-        .setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1775,8 +1783,8 @@ public class TableConfigUtilsTest {
     upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setDeleteRecordColumn(delCol);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-        .setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1787,9 +1795,8 @@ public class TableConfigUtilsTest {
     upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setDeleteRecordColumn(timestampCol);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-            .setUpsertConfig(upsertConfig)
-            .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
-            .build();
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
       Assert.fail("Should have failed table creation when delete column type 
is timestamp.");
@@ -1801,8 +1808,8 @@ public class TableConfigUtilsTest {
     upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setDeleteRecordColumn(invalidCol);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-        .setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1814,8 +1821,8 @@ public class TableConfigUtilsTest {
     upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setDeleteRecordColumn(mvCol);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-        .setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1826,14 +1833,13 @@ public class TableConfigUtilsTest {
 
     // upsert deleted-keys-ttl configs with no deleted column
     schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
-            .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
-            .addSingleValueDimension(delCol, 
FieldSpec.DataType.BOOLEAN).build();
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
     upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setDeletedKeysTTL(3600);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-            .setUpsertConfig(upsertConfig)
-            .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
-            .build();
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
     } catch (IllegalStateException e) {
@@ -1843,14 +1849,13 @@ public class TableConfigUtilsTest {
     upsertConfig.setDeleteRecordColumn(delCol);
     // multiple comparison columns set for deleted-keys-ttl
     schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
-            .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
-            .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
-            .addSingleValueDimension(delCol, 
FieldSpec.DataType.BOOLEAN).build();
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
     upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN, 
"myCol"));
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-            .setUpsertConfig(upsertConfig)
-            .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
-            .build();
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
     } catch (IllegalStateException e) {
@@ -1861,9 +1866,8 @@ public class TableConfigUtilsTest {
     // comparison column with non-numeric type
     upsertConfig.setComparisonColumns(Lists.newArrayList("myCol"));
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-            .setUpsertConfig(upsertConfig)
-            .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
-            .build();
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
     } catch (IllegalStateException e) {
@@ -1874,9 +1878,8 @@ public class TableConfigUtilsTest {
     // time column as comparison column
     upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN));
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-            .setUpsertConfig(upsertConfig)
-            .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
-            .build();
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
     TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
 
     // upsert out-of-order configs
@@ -1892,8 +1895,8 @@ public class TableConfigUtilsTest {
     upsertConfig.setDropOutOfOrderRecord(dropOutOfOrderRecord);
     upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-        .setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1912,8 +1915,8 @@ public class TableConfigUtilsTest {
     upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
     upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
-        .setUpsertConfig(upsertConfig)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setUpsertConfig(upsertConfig).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .build();
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1940,8 +1943,8 @@ public class TableConfigUtilsTest {
     partialUpsertConfig.setComparisonColumn("myCol2");
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(partialUpsertConfig)
-            .setNullHandlingEnabled(true)
-            .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+            .setNullHandlingEnabled(true).setRoutingConfig(
+                new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
             .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
@@ -1954,8 +1957,8 @@ public class TableConfigUtilsTest {
     partialUpsertConfig.setPartialUpsertStrategies(partialUpsertStratgies);
     
partialUpsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("myCol2")
-        .setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(true)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        
.setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(true).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
@@ -1966,8 +1969,8 @@ public class TableConfigUtilsTest {
 
     partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeCol")
-        .setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(false)
-        .setRoutingConfig(new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        
.setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(false).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
         .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 8af5773675..7bf5970f11 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -39,14 +39,17 @@ public class RoutingConfig extends BaseJsonConfig {
 
   private final List<String> _segmentPrunerTypes;
   private final String _instanceSelectorType;
+  private final Boolean _useFixedReplica;
 
   @JsonCreator
   public RoutingConfig(@JsonProperty("routingTableBuilderName") @Nullable 
String routingTableBuilderName,
       @JsonProperty("segmentPrunerTypes") @Nullable List<String> 
segmentPrunerTypes,
-      @JsonProperty("instanceSelectorType") @Nullable String 
instanceSelectorType) {
+      @JsonProperty("instanceSelectorType") @Nullable String 
instanceSelectorType,
+      @JsonProperty("useFixedReplica") @Nullable Boolean useFixedReplica) {
     _routingTableBuilderName = routingTableBuilderName;
     _segmentPrunerTypes = segmentPrunerTypes;
     _instanceSelectorType = instanceSelectorType;
+    _useFixedReplica = useFixedReplica;
   }
 
   @Nullable
@@ -63,4 +66,9 @@ public class RoutingConfig extends BaseJsonConfig {
   public String getInstanceSelectorType() {
     return _instanceSelectorType;
   }
+
+  @Nullable
+  public Boolean getUseFixedReplica() {
+    return _useFixedReplica;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 574a81a996..1ba40c9003 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -324,6 +324,9 @@ public class CommonConstants {
         "pinot.broker.enable.partition.metadata.manager";
     public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = 
false;
 
+    public static final String CONFIG_OF_USE_FIXED_REPLICA = 
"pinot.broker.use.fixed.replica";
+    public static final boolean DEFAULT_USE_FIXED_REPLICA = false;
+
     // Broker config indicating the maximum serialized response size across 
all servers for a query. This value is
     // equally divided across all servers processing the query.
     public static final String CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES = 
"pinot.broker.max.query.response.size.bytes";
@@ -350,6 +353,7 @@ public class CommonConstants {
         public static final String MIN_SERVER_GROUP_TRIM_SIZE = 
"minServerGroupTrimSize";
         public static final String MIN_BROKER_GROUP_TRIM_SIZE = 
"minBrokerGroupTrimSize";
         public static final String NUM_REPLICA_GROUPS_TO_QUERY = 
"numReplicaGroupsToQuery";
+        public static final String USE_FIXED_REPLICA = "useFixedReplica";
         public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
         public static final String USE_MULTISTAGE_ENGINE = 
"useMultistageEngine";
         public static final String ENABLE_NULL_HANDLING = "enableNullHandling";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to