This is an automated email from the ASF dual-hosted git repository.
saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 640ebe52cf Sticky query routing via query options (#12276)
640ebe52cf is described below
commit 640ebe52cf19922530b67f354126d007e3b650a2
Author: Saurabh Dubey <[email protected]>
AuthorDate: Sat Jan 20 17:07:19 2024 +0530
Sticky query routing via query options (#12276)
* Support sticky routing via query options
---------
Co-authored-by: Saurabh Dubey <[email protected]>
---
.../pinot/broker/routing/BrokerRoutingManager.java | 2 +-
.../AdaptiveServerSelectorFactory.java | 2 +
.../instanceselector/BalancedInstanceSelector.java | 14 ++-
.../instanceselector/BaseInstanceSelector.java | 23 ++++-
.../instanceselector/InstanceSelectorFactory.java | 30 ++++--
.../MultiStageReplicaGroupSelector.java | 18 +++-
.../ReplicaGroupInstanceSelector.java | 15 ++-
.../StrictReplicaGroupInstanceSelector.java | 5 +-
.../instanceselector/InstanceSelectorTest.java | 55 ++++++-----
.../common/utils/config/QueryOptionsUtils.java | 6 ++
.../common/utils/config/TableConfigSerDeTest.java | 2 +-
.../tests/BaseClusterIntegrationTest.java | 20 ++--
...PartitionLLCRealtimeClusterIntegrationTest.java | 2 +-
.../segment/local/utils/TableConfigUtilsTest.java | 103 +++++++++++----------
.../pinot/spi/config/table/RoutingConfig.java | 10 +-
.../apache/pinot/spi/utils/CommonConstants.java | 4 +
16 files changed, 202 insertions(+), 109 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index cc3a5354ef..9031e87580 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -446,7 +446,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
_pinotConfig);
InstanceSelector instanceSelector =
InstanceSelectorFactory.getInstanceSelector(tableConfig,
_propertyStore, _brokerMetrics,
- adaptiveServerSelector);
+ adaptiveServerSelector, _pinotConfig);
instanceSelector.init(_routableServers, idealState, externalView,
preSelectedOnlineSegments);
// Add time boundary manager if both offline and real-time part exist for
a hybrid table
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
index a97c562abf..ae83cfca69 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorFactory.java
@@ -19,6 +19,7 @@
package org.apache.pinot.broker.routing.adaptiveserverselector;
import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
@@ -35,6 +36,7 @@ public class AdaptiveServerSelectorFactory {
private AdaptiveServerSelectorFactory() {
}
+ @Nullable
public static AdaptiveServerSelector
getAdaptiveServerSelector(ServerRoutingStatsManager serverRoutingStatsManager,
PinotConfiguration pinotConfig) {
boolean enableStatsCollection =
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index 77b5389fd4..c827369907 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -31,7 +31,6 @@ import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSele
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.HashUtil;
-
/**
* Instance selector to balance the number of segments served by each selected
server instance.
* <p>If AdaptiveServerSelection is enabled, the request is routed to the best
available server for a segment
@@ -50,8 +49,9 @@ import org.apache.pinot.common.utils.HashUtil;
public class BalancedInstanceSelector extends BaseInstanceSelector {
public BalancedInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock);
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
+ boolean useFixedReplica) {
+ super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
}
@Override
@@ -89,7 +89,13 @@ public class BalancedInstanceSelector extends
BaseInstanceSelector {
if (candidates == null) {
continue;
}
- int selectedIdx = requestId++ % candidates.size();
+ int selectedIdx;
+ if (isUseFixedReplica(queryOptions)) {
+ // candidates array is always sorted
+ selectedIdx = _tableNameHashForFixedReplicaRouting %
candidates.size();
+ } else {
+ selectedIdx = requestId++ % candidates.size();
+ }
SegmentInstanceCandidate selectedCandidate =
candidates.get(selectedIdx);
// This can only be offline when it is a new segment. And such segment
is marked as optional segment so that
// broker or server can skip it upon any issue to process it.
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index b2961eef94..3cef77fac4 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -45,7 +45,9 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.SegmentUtils;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +89,8 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
final BrokerMetrics _brokerMetrics;
final AdaptiveServerSelector _adaptiveServerSelector;
final Clock _clock;
+ final boolean _useFixedReplica;
+ final int _tableNameHashForFixedReplicaRouting;
// These 3 variables are the cached states to help accelerate the change
processing
Set<String> _enabledInstances;
@@ -99,12 +103,24 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
private volatile SegmentStates _segmentStates;
BaseInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock) {
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
+ boolean useFixedReplica) {
_tableNameWithType = tableNameWithType;
_propertyStore = propertyStore;
_brokerMetrics = brokerMetrics;
_adaptiveServerSelector = adaptiveServerSelector;
_clock = clock;
+ _useFixedReplica = useFixedReplica;
+ // Using raw table name to ensure queries spanning across REALTIME and
OFFLINE tables are routed to the same
+ // instance
+ // Math.abs(Integer.MIN_VALUE) = Integer.MIN_VALUE, so we use & 0x7FFFFFFF
to get a positive value
+ _tableNameHashForFixedReplicaRouting =
+ TableNameBuilder.extractRawTableName(tableNameWithType).hashCode() &
0x7FFFFFFF;
+
+ if (_adaptiveServerSelector != null && _useFixedReplica) {
+ throw new IllegalArgumentException(
+ "AdaptiveServerSelector and consistent routing cannot be enabled at
the same time");
+ }
}
@Override
@@ -429,6 +445,11 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
}
+ protected boolean isUseFixedReplica(Map<String, String> queryOptions) {
+ Boolean queryOption = QueryOptionsUtils.isUseFixedReplica(queryOptions);
+ return queryOption != null ? queryOption : _useFixedReplica;
+ }
+
@Override
public Set<String> getServingInstances() {
return _segmentStates.getServingInstances();
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index 2428df02e5..2ccb1a9ac1 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
+import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import javax.annotation.Nullable;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -27,6 +28,8 @@ import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,44 +43,53 @@ public class InstanceSelectorFactory {
public static final String LEGACY_REPLICA_GROUP_OFFLINE_ROUTING =
"PartitionAwareOffline";
public static final String LEGACY_REPLICA_GROUP_REALTIME_ROUTING =
"PartitionAwareRealtime";
+ @VisibleForTesting
public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
- ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics) {
- return getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
null, Clock.systemUTC());
+ ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics, PinotConfiguration brokerConfig) {
+ return getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
null, Clock.systemUTC(), brokerConfig);
}
public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics,
- @Nullable AdaptiveServerSelector adaptiveServerSelector) {
- return getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector, Clock.systemUTC());
+ @Nullable AdaptiveServerSelector adaptiveServerSelector,
PinotConfiguration brokerConfig) {
+ return getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector, Clock.systemUTC(),
+ brokerConfig);
}
public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics,
- @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock) {
+ @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock,
PinotConfiguration brokerConfig) {
String tableNameWithType = tableConfig.getTableName();
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
+ boolean useFixedReplica =
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_USE_FIXED_REPLICA,
+ CommonConstants.Broker.DEFAULT_USE_FIXED_REPLICA);
if (routingConfig != null) {
+ if (routingConfig.getUseFixedReplica() != null) {
+ // table config overrides broker config
+ useFixedReplica = routingConfig.getUseFixedReplica();
+ }
if
(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
|| (tableConfig.getTableType() == TableType.OFFLINE &&
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING.equalsIgnoreCase(
routingConfig.getRoutingTableBuilderName())) ||
(tableConfig.getTableType() == TableType.REALTIME
&&
LEGACY_REPLICA_GROUP_REALTIME_ROUTING.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())))
{
LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}",
tableNameWithType);
return new ReplicaGroupInstanceSelector(tableNameWithType,
propertyStore, brokerMetrics, adaptiveServerSelector,
- clock);
+ clock, useFixedReplica);
}
if
(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
routingConfig.getInstanceSelectorType())) {
LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}",
tableNameWithType);
return new StrictReplicaGroupInstanceSelector(tableNameWithType,
propertyStore, brokerMetrics,
- adaptiveServerSelector, clock);
+ adaptiveServerSelector, clock, useFixedReplica);
}
if
(RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
routingConfig.getInstanceSelectorType())) {
LOGGER.info("Using {} for table: {}",
routingConfig.getInstanceSelectorType(), tableNameWithType);
return new MultiStageReplicaGroupSelector(tableNameWithType,
propertyStore, brokerMetrics,
- adaptiveServerSelector, clock);
+ adaptiveServerSelector, clock, useFixedReplica);
}
}
- return new BalancedInstanceSelector(tableNameWithType, propertyStore,
brokerMetrics, adaptiveServerSelector, clock);
+ return new BalancedInstanceSelector(tableNameWithType, propertyStore,
brokerMetrics, adaptiveServerSelector, clock,
+ useFixedReplica);
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index 15fb525a8c..b27450426d 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -58,8 +58,9 @@ public class MultiStageReplicaGroupSelector extends
BaseInstanceSelector {
private volatile InstancePartitions _instancePartitions;
public MultiStageReplicaGroupSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock);
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
+ boolean useFixedReplica) {
+ super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
}
@Override
@@ -86,7 +87,17 @@ public class MultiStageReplicaGroupSelector extends
BaseInstanceSelector {
SegmentStates segmentStates, Map<String, String> queryOptions) {
// Create a copy of InstancePartitions to avoid race-condition with
event-listeners above.
InstancePartitions instancePartitions = _instancePartitions;
- int replicaGroupSelected = requestId %
instancePartitions.getNumReplicaGroups();
+ int replicaGroupSelected;
+ if (isUseFixedReplica(queryOptions)) {
+ // When using sticky routing, we want to iterate over the
instancePartitions in order to ensure deterministic
+ // selection of replica group across queries i.e. same instance replica
group id is picked each time.
+ // Since the instances within a selected replica group are iterated in
order, the assignment within a selected
+ // replica group is guaranteed to be deterministic.
+ // Note: This can cause major hotspots in the cluster.
+ replicaGroupSelected = 0;
+ } else {
+ replicaGroupSelected = requestId %
instancePartitions.getNumReplicaGroups();
+ }
for (int iteration = 0; iteration <
instancePartitions.getNumReplicaGroups(); iteration++) {
int replicaGroup = (replicaGroupSelected + iteration) %
instancePartitions.getNumReplicaGroups();
try {
@@ -117,6 +128,7 @@ public class MultiStageReplicaGroupSelector extends
BaseInstanceSelector {
// If candidates are null, we will throw an exception and log a warning.
Preconditions.checkState(candidates != null, "Failed to find servers for
segment: %s", segment);
boolean found = false;
+ // candidates array is always sorted
for (SegmentInstanceCandidate candidate : candidates) {
String instance = candidate.getInstance();
if (instanceLookUpSet.contains(instance)) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index 3683ca46bb..0e9bd52d42 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -64,8 +64,9 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
public ReplicaGroupInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock);
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
+ boolean useFixedReplica) {
+ super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
}
@Override
@@ -107,7 +108,15 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
}
// Round robin selection.
int numCandidates = candidates.size();
- int instanceIdx = (requestId + replicaOffset) % numCandidates;
+ int instanceIdx;
+
+ if (isUseFixedReplica(queryOptions)) {
+ // candidates array is always sorted
+ instanceIdx = _tableNameHashForFixedReplicaRouting % numCandidates;
+ } else {
+ instanceIdx = (requestId + replicaOffset) % numCandidates;
+ }
+
SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
// This can only be offline when it is a new segment. And such segment
is marked as optional segment so that
// broker or server can skip it upon any issue to process it.
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 8c352bdbe6..95e83ea31c 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -71,8 +71,9 @@ public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSele
private static final Logger LOGGER =
LoggerFactory.getLogger(StrictReplicaGroupInstanceSelector.class);
public StrictReplicaGroupInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock);
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
+ boolean useFixedReplica) {
+ super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, useFixedReplica);
}
/**
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index c748be4885..9dc29eba15 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -51,6 +51,7 @@ import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
@@ -156,10 +157,10 @@ public class InstanceSelectorTest {
}
private InstanceSelector createTestInstanceSelector(String selectorType) {
- RoutingConfig config = new RoutingConfig(null, null, selectorType);
+ RoutingConfig config = new RoutingConfig(null, null, selectorType, false);
when(_tableConfig.getRoutingConfig()).thenReturn(config);
return InstanceSelectorFactory.getInstanceSelector(_tableConfig,
_propertyStore, _brokerMetrics, null,
- _mutableClock);
+ _mutableClock, new PinotConfiguration());
}
@DataProvider(name = "selectorType")
@@ -191,39 +192,40 @@ public class InstanceSelectorTest {
TableConfig tableConfig = mock(TableConfig.class);
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
+ when(tableConfig.getTableName()).thenReturn("testTable_OFFLINE");
// Routing config is missing
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore,
- brokerMetrics) instanceof BalancedInstanceSelector);
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
+ new PinotConfiguration()) instanceof BalancedInstanceSelector);
// Instance selector type is not configured
RoutingConfig routingConfig = mock(RoutingConfig.class);
when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore,
- brokerMetrics) instanceof BalancedInstanceSelector);
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
+ new PinotConfiguration()) instanceof BalancedInstanceSelector);
// Replica-group instance selector should be returned
when(routingConfig.getInstanceSelectorType()).thenReturn(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore,
- brokerMetrics) instanceof ReplicaGroupInstanceSelector);
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
+ new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
// Strict replica-group instance selector should be returned
when(routingConfig.getInstanceSelectorType()).thenReturn(STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore,
- brokerMetrics) instanceof StrictReplicaGroupInstanceSelector);
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
+ new PinotConfiguration()) instanceof
StrictReplicaGroupInstanceSelector);
// Should be backward-compatible with legacy config
when(routingConfig.getInstanceSelectorType()).thenReturn(null);
when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
InstanceSelectorFactory.LEGACY_REPLICA_GROUP_OFFLINE_ROUTING);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore,
- brokerMetrics) instanceof ReplicaGroupInstanceSelector);
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
+ new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
InstanceSelectorFactory.LEGACY_REPLICA_GROUP_REALTIME_ROUTING);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore,
- brokerMetrics) instanceof ReplicaGroupInstanceSelector);
+ assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
+ new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
}
@Test
@@ -232,11 +234,13 @@ public class InstanceSelectorTest {
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
BalancedInstanceSelector balancedInstanceSelector =
- new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC());
+ new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(), false);
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC());
+ new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ false);
StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
- new StrictReplicaGroupInstanceSelector(offlineTableName,
propertyStore, brokerMetrics, null, Clock.systemUTC());
+ new StrictReplicaGroupInstanceSelector(offlineTableName,
propertyStore, brokerMetrics, null, Clock.systemUTC(),
+ false);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -756,7 +760,8 @@ public class InstanceSelectorTest {
when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC());
+ new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ false);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -838,7 +843,8 @@ public class InstanceSelectorTest {
when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC());
+ new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ false);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -920,7 +926,8 @@ public class InstanceSelectorTest {
when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC());
+ new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ false);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -993,7 +1000,8 @@ public class InstanceSelectorTest {
when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
MultiStageReplicaGroupSelector multiStageSelector =
- new MultiStageReplicaGroupSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC());
+ new MultiStageReplicaGroupSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ false);
multiStageSelector = spy(multiStageSelector);
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
@@ -1088,10 +1096,11 @@ public class InstanceSelectorTest {
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
BalancedInstanceSelector balancedInstanceSelector =
- new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC());
+ new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null, Clock.systemUTC(), false);
// ReplicaGroupInstanceSelector has the same behavior as
BalancedInstanceSelector for the unavailable segments
StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
- new StrictReplicaGroupInstanceSelector(offlineTableName,
propertyStore, brokerMetrics, null, Clock.systemUTC());
+ new StrictReplicaGroupInstanceSelector(offlineTableName,
propertyStore, brokerMetrics, null, Clock.systemUTC(),
+ false);
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 8d93184c35..f31b41b63f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -145,6 +145,12 @@ public class QueryOptionsUtils {
return
"false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
}
+ @Nullable
+ public static Boolean isUseFixedReplica(Map<String, String> queryOptions) {
+ String useFixedReplica =
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.USE_FIXED_REPLICA);
+ return useFixedReplica != null ? Boolean.parseBoolean(useFixedReplica) :
null;
+ }
+
@Nullable
public static Integer getNumReplicaGroupsToQuery(Map<String, String>
queryOptions) {
String numReplicaGroupsToQuery =
queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 5a56d9e884..ed9d605af0 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -176,7 +176,7 @@ public class TableConfigSerDeTest {
{
// With routing config
RoutingConfig routingConfig =
- new RoutingConfig("builder", Arrays.asList("pruner0", "pruner1",
"pruner2"), "selector");
+ new RoutingConfig("builder", Arrays.asList("pruner0", "pruner1",
"pruner2"), "selector", false);
TableConfig tableConfig =
tableConfigBuilder.setRoutingConfig(routingConfig).build();
checkRoutingConfig(tableConfig);
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 25ff663951..1a4b9691ac 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -369,8 +369,8 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled())
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
.setUpsertConfig(upsertConfig).build();
@@ -416,13 +416,13 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
kafkaTopicName);
streamConfigsMap.putAll(streamDecoderProperties);
- return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
-
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
-
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
- .setStreamConfigs(streamConfigsMap)
+ return new
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setStreamConfigs(streamConfigsMap)
.setNullHandlingEnabled(UpsertConfig.Mode.PARTIAL.equals(upsertConfig.getMode())
|| getNullHandlingEnabled())
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
.setUpsertConfig(upsertConfig).build();
@@ -440,8 +440,8 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled())
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
.setDedupConfig(new DedupConfig(true, HashFunction.NONE)).build();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index b7e2ac077a..4b0c4e6c14 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -93,7 +93,7 @@ public class
SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
indexingConfig.setSegmentPartitionConfig(new SegmentPartitionConfig(
Collections.singletonMap(PARTITION_COLUMN, new
ColumnPartitionConfig("murmur", 2))));
tableConfig.setRoutingConfig(
- new RoutingConfig(null,
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null));
+ new RoutingConfig(null,
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null,
false));
addTableConfig(tableConfig);
// Push data into Kafka (only ingest the first Avro file)
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 22aff329d7..0c510add00 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1609,15 +1609,15 @@ public class TableConfigUtilsTest {
Collections.singletonList(
new AggregationFunctionColumnPair(AggregationFunctionType.COUNT,
"myCol").toColumnName()), null, 10);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setDedupConfig(new DedupConfig(true, HashFunction.NONE))
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setDedupConfig(new DedupConfig(true,
HashFunction.NONE)).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
// Dedup and upsert can't be enabled simultaneously
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
- .setDedupConfig(new DedupConfig(true, HashFunction.NONE))
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setDedupConfig(new DedupConfig(true,
HashFunction.NONE)).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1668,7 +1668,8 @@ public class TableConfigUtilsTest {
// invalid tag override with upsert
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs())
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setTagOverrideConfig(new TagOverrideConfig("T1_REALTIME",
"T2_REALTIME")).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1680,19 +1681,22 @@ public class TableConfigUtilsTest {
// valid tag override with upsert
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs())
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setTagOverrideConfig(new TagOverrideConfig("T1_REALTIME",
"T1_REALTIME")).build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
// empty tag override with upsert should pass
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
.setUpsertConfig(new
UpsertConfig(UpsertConfig.Mode.FULL)).setStreamConfigs(getStreamConfigs())
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setTagOverrideConfig(new TagOverrideConfig(null, null)).build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs).build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1700,7 +1704,8 @@ public class TableConfigUtilsTest {
Collections.singletonList(
new AggregationFunctionColumnPair(AggregationFunctionType.COUNT,
"myCol").toColumnName()), null, 10);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1711,7 +1716,8 @@ public class TableConfigUtilsTest {
//With Aggregate Metrics
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs).setAggregateMetrics(true).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1724,7 +1730,8 @@ public class TableConfigUtilsTest {
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("twiceSum", "SUM(twice)")));
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs).setIngestionConfig(ingestionConfig).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1735,7 +1742,8 @@ public class TableConfigUtilsTest {
//With aggregation Configs in Ingestion Config and IndexingConfig at the
same time
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs).setAggregateMetrics(true).setIngestionConfig(ingestionConfig).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1763,8 +1771,8 @@ public class TableConfigUtilsTest {
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setDeleteRecordColumn(stringTypeDelCol);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1775,8 +1783,8 @@ public class TableConfigUtilsTest {
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setDeleteRecordColumn(delCol);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1787,9 +1795,8 @@ public class TableConfigUtilsTest {
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setDeleteRecordColumn(timestampCol);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
- .build();
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
Assert.fail("Should have failed table creation when delete column type
is timestamp.");
@@ -1801,8 +1808,8 @@ public class TableConfigUtilsTest {
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setDeleteRecordColumn(invalidCol);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1814,8 +1821,8 @@ public class TableConfigUtilsTest {
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setDeleteRecordColumn(mvCol);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1826,14 +1833,13 @@ public class TableConfigUtilsTest {
// upsert deleted-keys-ttl configs with no deleted column
schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
- .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
- .addSingleValueDimension(delCol,
FieldSpec.DataType.BOOLEAN).build();
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setDeletedKeysTTL(3600);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
- .build();
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
} catch (IllegalStateException e) {
@@ -1843,14 +1849,13 @@ public class TableConfigUtilsTest {
upsertConfig.setDeleteRecordColumn(delCol);
// multiple comparison columns set for deleted-keys-ttl
schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
- .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
- .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
- .addSingleValueDimension(delCol,
FieldSpec.DataType.BOOLEAN).build();
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN).build();
upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN,
"myCol"));
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
- .build();
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
} catch (IllegalStateException e) {
@@ -1861,9 +1866,8 @@ public class TableConfigUtilsTest {
// comparison column with non-numeric type
upsertConfig.setComparisonColumns(Lists.newArrayList("myCol"));
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
- .build();
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
} catch (IllegalStateException e) {
@@ -1874,9 +1878,8 @@ public class TableConfigUtilsTest {
// time column as comparison column
upsertConfig.setComparisonColumns(Lists.newArrayList(TIME_COLUMN));
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
- .build();
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false)).build();
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
// upsert out-of-order configs
@@ -1892,8 +1895,8 @@ public class TableConfigUtilsTest {
upsertConfig.setDropOutOfOrderRecord(dropOutOfOrderRecord);
upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1912,8 +1915,8 @@ public class TableConfigUtilsTest {
upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
- .setUpsertConfig(upsertConfig)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setUpsertConfig(upsertConfig).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.build();
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
@@ -1940,8 +1943,8 @@ public class TableConfigUtilsTest {
partialUpsertConfig.setComparisonColumn("myCol2");
TableConfig tableConfig =
new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setUpsertConfig(partialUpsertConfig)
- .setNullHandlingEnabled(true)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setNullHandlingEnabled(true).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
@@ -1954,8 +1957,8 @@ public class TableConfigUtilsTest {
partialUpsertConfig.setPartialUpsertStrategies(partialUpsertStratgies);
partialUpsertConfig.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("myCol2")
- .setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(true)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+
.setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(true).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
@@ -1966,8 +1969,8 @@ public class TableConfigUtilsTest {
partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeCol")
- .setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(false)
- .setRoutingConfig(new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+
.setUpsertConfig(partialUpsertConfig).setNullHandlingEnabled(false).setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
.setStreamConfigs(streamConfigs).build();
try {
TableConfigUtils.validatePartialUpsertStrategies(tableConfig, schema);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 8af5773675..7bf5970f11 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -39,14 +39,17 @@ public class RoutingConfig extends BaseJsonConfig {
private final List<String> _segmentPrunerTypes;
private final String _instanceSelectorType;
+ private final Boolean _useFixedReplica;
@JsonCreator
public RoutingConfig(@JsonProperty("routingTableBuilderName") @Nullable
String routingTableBuilderName,
@JsonProperty("segmentPrunerTypes") @Nullable List<String>
segmentPrunerTypes,
- @JsonProperty("instanceSelectorType") @Nullable String
instanceSelectorType) {
+ @JsonProperty("instanceSelectorType") @Nullable String
instanceSelectorType,
+ @JsonProperty("useFixedReplica") @Nullable Boolean useFixedReplica) {
_routingTableBuilderName = routingTableBuilderName;
_segmentPrunerTypes = segmentPrunerTypes;
_instanceSelectorType = instanceSelectorType;
+ _useFixedReplica = useFixedReplica;
}
@Nullable
@@ -63,4 +66,9 @@ public class RoutingConfig extends BaseJsonConfig {
public String getInstanceSelectorType() {
return _instanceSelectorType;
}
+
+ @Nullable
+ public Boolean getUseFixedReplica() {
+ return _useFixedReplica;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 574a81a996..1ba40c9003 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -324,6 +324,9 @@ public class CommonConstants {
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER =
false;
+ public static final String CONFIG_OF_USE_FIXED_REPLICA =
"pinot.broker.use.fixed.replica";
+ public static final boolean DEFAULT_USE_FIXED_REPLICA = false;
+
// Broker config indicating the maximum serialized response size across
all servers for a query. This value is
// equally divided across all servers processing the query.
public static final String CONFIG_OF_MAX_QUERY_RESPONSE_SIZE_BYTES =
"pinot.broker.max.query.response.size.bytes";
@@ -350,6 +353,7 @@ public class CommonConstants {
public static final String MIN_SERVER_GROUP_TRIM_SIZE =
"minServerGroupTrimSize";
public static final String MIN_BROKER_GROUP_TRIM_SIZE =
"minBrokerGroupTrimSize";
public static final String NUM_REPLICA_GROUPS_TO_QUERY =
"numReplicaGroupsToQuery";
+ public static final String USE_FIXED_REPLICA = "useFixedReplica";
public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
public static final String USE_MULTISTAGE_ENGINE =
"useMultistageEngine";
public static final String ENABLE_NULL_HANDLING = "enableNullHandling";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]