This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 575398dae0 Remove special routing handling for multiple consuming
segments (#11371)
575398dae0 is described below
commit 575398dae0841ba152c31306cb460cfdc776b04b
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Aug 17 10:12:47 2023 -0700
Remove special routing handling for multiple consuming segments (#11371)
---
.../instanceselector/BaseInstanceSelector.java | 20 ++-
.../StrictReplicaGroupInstanceSelector.java | 24 ++-
...ntSelector.java => DefaultSegmentSelector.java} | 10 +-
.../segmentselector/RealtimeSegmentSelector.java | 163 ---------------------
.../routing/segmentselector/SegmentSelector.java | 12 --
.../segmentselector/SegmentSelectorFactory.java | 7 +-
.../segmentselector/SegmentSelectorTest.java | 134 -----------------
.../common/utils/config/QueryOptionsUtils.java | 6 -
.../apache/pinot/spi/utils/CommonConstants.java | 1 -
9 files changed, 35 insertions(+), 342 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 667cc766d8..c9219c3ea3 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -156,6 +156,8 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
newSegmentPushTimeMap.put(segmentZKMetadata.getSegmentName(),
pushTimeMillis);
}
}
+ LOGGER.info("Got {} new segments: {} for table: {} by reading ZK metadata,
current time: {}",
+ newSegmentPushTimeMap.size(), newSegmentPushTimeMap,
_tableNameWithType, nowMillis);
return newSegmentPushTimeMap;
}
@@ -294,9 +296,9 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
for (SegmentInstanceCandidate candidate : candidates) {
candidateInstances.add(candidate.getInstance());
}
- LOGGER.warn(
- "Failed to find servers hosting segment: {} for table: {} (all
candidate instances: {} are disabled, "
- + "counting segment as unavailable)", segment,
_tableNameWithType, candidateInstances);
+ LOGGER.warn("Failed to find servers hosting old segment: {} for table:
{} "
+ + "(all candidate instances: {} are disabled, counting segment
as unavailable)", segment,
+ _tableNameWithType, candidateInstances);
unavailableSegments.add(segment);
_brokerMetrics.addMeteredTableValue(_tableNameWithType,
BrokerMeter.NO_SERVING_HOST_FOR_SEGMENT, 1);
}
@@ -314,8 +316,16 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
if (!enabledCandidates.isEmpty()) {
instanceCandidatesMap.put(segment, enabledCandidates);
+ } else {
+ // Do not count new segment as unavailable
+ List<String> candidateInstances = new ArrayList<>(candidates.size());
+ for (SegmentInstanceCandidate candidate : candidates) {
+ candidateInstances.add(candidate.getInstance());
+ }
+ LOGGER.info("Failed to find servers hosting new segment: {} for table:
{} "
+ + "(all candidate instances: {} are disabled, but not counting
new segment as unavailable)", segment,
+ _tableNameWithType, candidateInstances);
}
- // Do not count new segment as unavailable
}
_segmentStates = new SegmentStates(instanceCandidatesMap,
unavailableSegments);
@@ -377,6 +387,8 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
}
}
+ LOGGER.info("Got {} new segments: {} for table: {} by processing existing
states, current time: {}",
+ newSegmentPushTimeMap.size(), newSegmentPushTimeMap,
_tableNameWithType, nowMillis);
return newSegmentPushTimeMap;
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index b00cf851c7..ddb83ce282 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -34,6 +34,8 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.HashUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -66,6 +68,7 @@ import org.apache.pinot.common.utils.HashUtil;
* </pre>
*/
public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StrictReplicaGroupInstanceSelector.class);
public StrictReplicaGroupInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock) {
@@ -122,12 +125,19 @@ public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSele
Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
for (Map.Entry<String, Set<String>> entry :
oldSegmentToOnlineInstancesMap.entrySet()) {
String segment = entry.getKey();
- Set<String> instancesInIdealState =
idealStateAssignment.get(segment).keySet();
+ Set<String> onlineInstances = entry.getValue();
+ Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
+ Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
Set<String> unavailableInstances =
unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k ->
new HashSet<>());
for (String instance : instancesInIdealState) {
- if (!entry.getValue().contains(instance)) {
- unavailableInstances.add(instance);
+ if (!onlineInstances.contains(instance)) {
+ if (unavailableInstances.add(instance)) {
+ LOGGER.warn(
+ "Found unavailable instance: {} in instance group: {} for
segment: {}, table: {} (IS: {}, EV: {})",
+ instance, instancesInIdealState, segment, _tableNameWithType,
idealStateInstanceStateMap,
+ externalViewAssignment.get(segment));
+ }
}
}
}
@@ -138,8 +148,7 @@ public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSele
// NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
Set<String> onlineInstances = entry.getValue();
Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
- Set<String> unavailableInstances =
-
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(),
Collections.emptySet());
+ Set<String> unavailableInstances =
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(onlineInstances.size());
for (String instance : onlineInstances) {
if (!unavailableInstances.contains(instance)) {
@@ -156,9 +165,8 @@ public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSele
Set<String> unavailableInstances =
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(),
Collections.emptySet());
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(idealStateInstanceStateMap.size());
- for (Map.Entry<String, String> instanceStateEntry :
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
- String instance = instanceStateEntry.getKey();
- if (!unavailableInstances.contains(instance) &&
isOnlineForRouting(instanceStateEntry.getValue())) {
+ for (String instance :
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+ if (!unavailableInstances.contains(instance)) {
candidates.add(new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance)));
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/DefaultSegmentSelector.java
similarity index 82%
rename from
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
rename to
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/DefaultSegmentSelector.java
index 32f1cd963d..069ce53da3 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/DefaultSegmentSelector.java
@@ -25,22 +25,16 @@ import org.apache.helix.model.IdealState;
import org.apache.pinot.common.request.BrokerRequest;
-/**
- * Segment selector for offline table.
- */
-public class OfflineSegmentSelector implements SegmentSelector {
+public class DefaultSegmentSelector implements SegmentSelector {
private volatile Set<String> _segments;
@Override
public void init(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
- onAssignmentChange(idealState, externalView, onlineSegments);
+ _segments = Collections.unmodifiableSet(onlineSegments);
}
@Override
public void onAssignmentChange(IdealState idealState, ExternalView
externalView, Set<String> onlineSegments) {
- // TODO: for new added segments, before all replicas are up, consider not
selecting them to avoid causing
- // hotspot servers
-
_segments = Collections.unmodifiableSet(onlineSegments);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
deleted file mode 100644
index 72d44ffd81..0000000000
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.routing.segmentselector;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.utils.HLCSegmentName;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.SegmentName;
-import org.apache.pinot.common.utils.config.QueryOptionsUtils;
-import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-
-
-/**
- * Segment selector for real-time table which handles the following scenarios:
- * <ul>
- * <li>When HLC and LLC segments coexist (during LLC migration), select only
HLC segments or LLC segments</li>
- * <li>For HLC segments, only select segments in one group</li>
- * <li>
- * For LLC segments, only select the first CONSUMING segment for each
partition to avoid duplicate data because in
- * certain unlikely degenerate scenarios, we can consume overlapping data
until segments are flushed (at which point
- * the overlapping data is discarded during the reconciliation process
with the controller).
- * </li>
- * </ul>
- */
-public class RealtimeSegmentSelector implements SegmentSelector {
- private final AtomicLong _requestId = new AtomicLong();
- private volatile List<Set<String>> _hlcSegments;
- private volatile Set<String> _llcSegments;
-
- @Override
- public void init(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
- onAssignmentChange(idealState, externalView, onlineSegments);
- }
-
- @Override
- public void onAssignmentChange(IdealState idealState, ExternalView
externalView, Set<String> onlineSegments) {
- // Group HLC segments by their group id
- // NOTE: Use TreeMap so that group ids are sorted and the result is
deterministic
- Map<String, Set<String>> groupIdToHLCSegmentsMap = new TreeMap<>();
-
- List<String> completedLLCSegments = new ArrayList<>();
- // Store the first CONSUMING segment for each partition
- Map<Integer, LLCSegmentName> partitionIdToFirstConsumingLLCSegmentMap =
new HashMap<>();
-
- // Iterate over the external view instead of the online segments so that
the map lookups are performed on the
- // HashSet instead of the TreeSet for performance. For LLC segments, we
need the external view to figure out whether
- // the segments are in CONSUMING state. For the goal of segment selector,
we should not exclude segments not in the
- // external view, but it is okay to exclude them as there is no way to
route them without instance states in
- // external view.
- // - New added segment might only exist in ideal state
- // - New removed segment might only exist in external view
- for (Map.Entry<String, Map<String, String>> entry :
externalView.getRecord().getMapFields().entrySet()) {
- String segment = entry.getKey();
- if (!onlineSegments.contains(segment)) {
- continue;
- }
-
- // TODO: for new added segments, before all replicas are up, consider
not selecting them to avoid causing
- // hotspot servers
-
- Map<String, String> instanceStateMap = entry.getValue();
- if (SegmentName.isHighLevelConsumerSegmentName(segment)) {
- HLCSegmentName hlcSegmentName = new HLCSegmentName(segment);
- groupIdToHLCSegmentsMap.computeIfAbsent(hlcSegmentName.getGroupId(), k
-> new HashSet<>()).add(segment);
- } else {
- if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
- // Keep the first CONSUMING segment for each partition
- LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
- partitionIdToFirstConsumingLLCSegmentMap
- .compute(llcSegmentName.getPartitionGroupId(), (k,
consumingSegment) -> {
- if (consumingSegment == null) {
- return llcSegmentName;
- } else {
- if (llcSegmentName.getSequenceNumber() <
consumingSegment.getSequenceNumber()) {
- return llcSegmentName;
- } else {
- return consumingSegment;
- }
- }
- });
- } else {
- completedLLCSegments.add(segment);
- }
- }
- }
-
- int numHLCGroups = groupIdToHLCSegmentsMap.size();
- if (numHLCGroups != 0) {
- List<Set<String>> hlcSegments = new ArrayList<>(numHLCGroups);
- for (Set<String> hlcSegmentsForGroup : groupIdToHLCSegmentsMap.values())
{
- hlcSegments.add(Collections.unmodifiableSet(hlcSegmentsForGroup));
- }
- _hlcSegments = hlcSegments;
- } else {
- _hlcSegments = null;
- }
-
- if (!completedLLCSegments.isEmpty() ||
!partitionIdToFirstConsumingLLCSegmentMap.isEmpty()) {
- Set<String> llcSegments =
- new HashSet<>(completedLLCSegments.size() +
partitionIdToFirstConsumingLLCSegmentMap.size());
- llcSegments.addAll(completedLLCSegments);
- for (LLCSegmentName llcSegmentName :
partitionIdToFirstConsumingLLCSegmentMap.values()) {
- llcSegments.add(llcSegmentName.getSegmentName());
- }
- _llcSegments = Collections.unmodifiableSet(llcSegments);
- } else {
- _llcSegments = null;
- }
- }
-
- @Override
- public Set<String> select(BrokerRequest brokerRequest) {
- if (_hlcSegments == null && _llcSegments == null) {
- return Collections.emptySet();
- }
- if (_hlcSegments == null) {
- return selectLLCSegments();
- }
- if (_llcSegments == null) {
- return selectHLCSegments();
- }
-
- // Handle HLC and LLC coexisting scenario, select HLC segments only if it
is forced in the routing options
- return
QueryOptionsUtils.isRoutingForceHLC(brokerRequest.getPinotQuery().getQueryOptions())
? selectHLCSegments()
- : selectLLCSegments();
- }
-
- private Set<String> selectHLCSegments() {
- List<Set<String>> hlcSegments = _hlcSegments;
- return hlcSegments.get((int) (_requestId.getAndIncrement() %
hlcSegments.size()));
- }
-
- private Set<String> selectLLCSegments() {
- return _llcSegments;
- }
-}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
index 5c12bb961f..76c0ba3b37 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
@@ -28,18 +28,6 @@ import org.apache.pinot.common.request.BrokerRequest;
/**
* The segment selector selects the segments for the query. The segments
selected should cover the whole dataset (table)
* without overlap.
- * <p>Segment selector examples:
- * <ul>
- * <li>
- * For real-time table, when HLC and LLC segments coexist (during LLC
migration), select only HLC segments or LLC
- * segments
- * </li>
- * <li>For HLC real-time table, select segments in one group</li>
- * <li>
- * For table with segment merge/rollup enabled, select the merged segments
over the original segments with the same
- * data
- * </li>
- * </ul>
*/
public interface SegmentSelector {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorFactory.java
index 4085a54715..f6bd0ce24b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorFactory.java
@@ -19,7 +19,6 @@
package org.apache.pinot.broker.routing.segmentselector;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
public class SegmentSelectorFactory {
@@ -27,10 +26,6 @@ public class SegmentSelectorFactory {
}
public static SegmentSelector getSegmentSelector(TableConfig tableConfig) {
- if (tableConfig.getTableType() == TableType.OFFLINE) {
- return new OfflineSegmentSelector();
- } else {
- return new RealtimeSegmentSelector();
- }
+ return new DefaultSegmentSelector();
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
deleted file mode 100644
index e3ec929d59..0000000000
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.routing.segmentselector;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.utils.HLCSegmentName;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
-import org.testng.annotations.Test;
-
-import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
-import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEqualsNoOrder;
-import static org.testng.Assert.assertTrue;
-
-
-public class SegmentSelectorTest {
-
- @Test
- public void testSegmentSelectorFactory() {
- TableConfig tableConfig = mock(TableConfig.class);
-
- when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
- assertTrue(SegmentSelectorFactory.getSegmentSelector(tableConfig)
instanceof OfflineSegmentSelector);
-
- when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
- assertTrue(SegmentSelectorFactory.getSegmentSelector(tableConfig)
instanceof RealtimeSegmentSelector);
- }
-
- @Test
- public void testRealtimeSegmentSelector() {
- String realtimeTableName = "testTable_REALTIME";
- ExternalView externalView = new ExternalView(realtimeTableName);
- Map<String, Map<String, String>> segmentAssignment =
externalView.getRecord().getMapFields();
- Map<String, String> onlineInstanceStateMap =
Collections.singletonMap("server", ONLINE);
- Map<String, String> consumingInstanceStateMap =
Collections.singletonMap("server", CONSUMING);
- Set<String> onlineSegments = new HashSet<>();
- // NOTE: Ideal state is not used in the current implementation
- IdealState idealState = mock(IdealState.class);
-
- // Should return an empty list when there is no segment
- RealtimeSegmentSelector segmentSelector = new RealtimeSegmentSelector();
- segmentSelector.init(idealState, externalView, onlineSegments);
- BrokerRequest brokerRequest = mock(BrokerRequest.class);
- PinotQuery pinotQuery = mock(PinotQuery.class);
- when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
- assertTrue(segmentSelector.select(brokerRequest).isEmpty());
-
- // For HLC segments, only one group of segments should be selected
- int numHLCGroups = 3;
- int numHLCSegmentsPerGroup = 5;
- String[][] hlcSegments = new String[numHLCGroups][];
- for (int i = 0; i < numHLCGroups; i++) {
- String groupId = "testTable_REALTIME_" + i;
- String[] hlcSegmentsForGroup = new String[numHLCSegmentsPerGroup];
- for (int j = 0; j < numHLCSegmentsPerGroup; j++) {
- String hlcSegment = new HLCSegmentName(groupId, "0",
Integer.toString(j)).getSegmentName();
- segmentAssignment.put(hlcSegment, onlineInstanceStateMap);
- onlineSegments.add(hlcSegment);
- hlcSegmentsForGroup[j] = hlcSegment;
- }
- hlcSegments[i] = hlcSegmentsForGroup;
- }
- segmentSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
-
- // Only HLC segments exist, should select the HLC segments from the first
group
- assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(),
hlcSegments[0]);
-
- // For LLC segments, only the first CONSUMING segment for each partition
should be selected
- int numLLCPartitions = 3;
- int numLLCSegmentsPerPartition = 5;
- int numOnlineLLCSegmentsPerPartition = 3;
- String[] expectedSelectedLLCSegments = new String[numLLCPartitions *
(numLLCSegmentsPerPartition - 1)];
- for (int i = 0; i < numLLCPartitions; i++) {
- for (int j = 0; j < numLLCSegmentsPerPartition; j++) {
- String llcSegment = new LLCSegmentName(realtimeTableName, i, j,
0).getSegmentName();
- if (j < numOnlineLLCSegmentsPerPartition) {
- externalView.setStateMap(llcSegment, onlineInstanceStateMap);
- } else {
- externalView.setStateMap(llcSegment, consumingInstanceStateMap);
- }
- onlineSegments.add(llcSegment);
- if (j < numLLCSegmentsPerPartition - 1) {
- expectedSelectedLLCSegments[i * (numLLCSegmentsPerPartition - 1) +
j] = llcSegment;
- }
- }
- }
- segmentSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
-
- // Both HLC and LLC segments exist, should select the LLC segments
- assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(),
expectedSelectedLLCSegments);
-
- // When HLC is forced, should select the HLC segments from the second group
- when(pinotQuery.getQueryOptions()).thenReturn(
- Collections.singletonMap(Request.QueryOptionKey.ROUTING_OPTIONS,
Request.QueryOptionValue.ROUTING_FORCE_HLC));
- assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(),
hlcSegments[1]);
-
- // Remove all the HLC segments from ideal state, should select the LLC
segments even when HLC is forced
- for (String[] hlcSegmentsForGroup : hlcSegments) {
- for (String hlcSegment : hlcSegmentsForGroup) {
- onlineSegments.remove(hlcSegment);
- }
- }
- segmentSelector.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(),
expectedSelectedLLCSegments);
- }
-}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 2682eb927a..5b239fde33 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -27,7 +27,6 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
-import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionValue;
/**
@@ -118,11 +117,6 @@ public class QueryOptionsUtils {
return
"false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_STAR_TREE));
}
- public static boolean isRoutingForceHLC(Map<String, String> queryOptions) {
- String routingOptions = queryOptions.get(QueryOptionKey.ROUTING_OPTIONS);
- return routingOptions != null &&
routingOptions.toUpperCase().contains(QueryOptionValue.ROUTING_FORCE_HLC);
- }
-
public static boolean isSkipScanFilterReorder(Map<String, String>
queryOptions) {
return
"false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 46b2c5148e..1e10152cfb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -360,7 +360,6 @@ public class CommonConstants {
}
public static class QueryOptionValue {
- public static final String ROUTING_FORCE_HLC = "FORCE_HLC";
public static final String DEFAULT_IN_PREDICATE_SORT_THRESHOLD =
"1000";
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]