This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fb2e4dff46 [multistage] Add Support for Broker Server/Segment Pruning
(#15959)
fb2e4dff46 is described below
commit fb2e4dff467fa19e5eec1deb53380c3ad4183699
Author: Ankit Sultana <[email protected]>
AuthorDate: Wed Jun 4 18:11:34 2025 -0500
[multistage] Add Support for Broker Server/Segment Pruning (#15959)
---
.../query/context/PhysicalPlannerContext.java | 11 ++-
.../query/parser/CalciteRexExpressionParser.java | 41 ++++----
.../planner/logical/LeafStageToPinotQuery.java | 105 +++++++++++++++++++++
.../opt/rules/LeafStageWorkerAssignmentRule.java | 56 ++++++++---
.../rules/LeafStageWorkerAssignmentRuleTest.java | 23 +++++
.../resources/queries/PhysicalOptimizerPlans.json | 18 ++++
.../plan/server/ServerPlanRequestVisitor.java | 8 +-
.../apache/pinot/spi/utils/CommonConstants.java | 5 +
8 files changed, 227 insertions(+), 40 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 66809336e1..3cfd58aa03 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -122,6 +122,10 @@ public class PhysicalPlannerContext {
return _useLiteMode;
}
+ private QueryServerInstance getBrokerQueryServerInstance() {
+ return new QueryServerInstance(_instanceId, _hostName, _port, _port);
+ }
+
public static boolean isUsePhysicalOptimizer(@Nullable Map<String, String>
queryOptions) {
if (queryOptions == null) {
return false;
@@ -129,8 +133,11 @@ public class PhysicalPlannerContext {
return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER,
"false"));
}
- private QueryServerInstance getBrokerQueryServerInstance() {
- return new QueryServerInstance(_instanceId, _hostName, _port, _port);
+ public static boolean isUseBrokerPruning(@Nullable Map<String, String>
queryOptions) {
+ if (queryOptions == null) {
+ return false;
+ }
+ return
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING,
"false"));
}
private static boolean useLiteMode(@Nullable Map<String, String>
queryOptions) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index fdd19a9aef..a5e9531aad 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -60,10 +60,10 @@ public class CalciteRexExpressionParser {
// Relational conversion Utils
// --------------------------------------------------------------------------
- public static List<Expression> convertRexNodes(List<RexExpression> rexNodes,
PinotQuery pinotQuery) {
+ public static List<Expression> convertRexNodes(List<RexExpression> rexNodes,
List<Expression> selectList) {
List<Expression> expressions = new ArrayList<>(rexNodes.size());
for (RexExpression rexNode : rexNodes) {
- expressions.add(toExpression(rexNode, pinotQuery));
+ expressions.add(toExpression(rexNode, selectList));
}
return expressions;
}
@@ -78,18 +78,18 @@ public class CalciteRexExpressionParser {
}
public static List<Expression> convertAggregateList(List<Expression>
groupByList,
- List<RexExpression.FunctionCall> aggCalls, List<Integer> filterArgs,
PinotQuery pinotQuery) {
+ List<RexExpression.FunctionCall> aggCalls, List<Integer> filterArgs,
List<Expression> selectList) {
int numAggCalls = aggCalls.size();
List<Expression> expressions = new ArrayList<>(groupByList.size() +
numAggCalls);
expressions.addAll(groupByList);
for (int i = 0; i < numAggCalls; i++) {
- Expression aggFunction = compileFunctionExpression(aggCalls.get(i),
pinotQuery);
+ Expression aggFunction = compileFunctionExpression(aggCalls.get(i),
selectList);
int filterArgIdx = filterArgs.get(i);
if (filterArgIdx == -1) {
expressions.add(aggFunction);
} else {
expressions.add(
- RequestUtils.getFunctionExpression(FILTER, aggFunction,
pinotQuery.getSelectList().get(filterArgIdx)));
+ RequestUtils.getFunctionExpression(FILTER, aggFunction,
selectList.get(filterArgIdx)));
}
}
return expressions;
@@ -120,19 +120,18 @@ public class CalciteRexExpressionParser {
}
}
- public static Expression toExpression(RexExpression rexNode, PinotQuery
pinotQuery) {
+ public static Expression toExpression(RexExpression rexNode,
List<Expression> selectList) {
if (rexNode instanceof RexExpression.InputRef) {
- return inputRefToIdentifier((RexExpression.InputRef) rexNode,
pinotQuery);
+ return inputRefToIdentifier((RexExpression.InputRef) rexNode,
selectList);
} else if (rexNode instanceof RexExpression.Literal) {
return
RequestUtils.getLiteralExpression(toLiteral((RexExpression.Literal) rexNode));
} else {
assert rexNode instanceof RexExpression.FunctionCall;
- return compileFunctionExpression((RexExpression.FunctionCall) rexNode,
pinotQuery);
+ return compileFunctionExpression((RexExpression.FunctionCall) rexNode,
selectList);
}
}
- private static Expression inputRefToIdentifier(RexExpression.InputRef
inputRef, PinotQuery pinotQuery) {
- List<Expression> selectList = pinotQuery.getSelectList();
+ private static Expression inputRefToIdentifier(RexExpression.InputRef
inputRef, List<Expression> selectList) {
return selectList.get(inputRef.getIndex());
}
@@ -153,13 +152,13 @@ public class CalciteRexExpressionParser {
return RequestUtils.getLiteral(value);
}
- private static Expression
compileFunctionExpression(RexExpression.FunctionCall rexCall, PinotQuery
pinotQuery) {
+ private static Expression
compileFunctionExpression(RexExpression.FunctionCall rexCall, List<Expression>
selectList) {
String functionName = rexCall.getFunctionName();
if (functionName.equals(AND)) {
- return compileAndExpression(rexCall, pinotQuery);
+ return compileAndExpression(rexCall, selectList);
}
if (functionName.equals(OR)) {
- return compileOrExpression(rexCall, pinotQuery);
+ return compileOrExpression(rexCall, selectList);
}
String canonicalName =
RequestUtils.canonicalizeFunctionNamePreservingSpecialKey(functionName);
List<RexExpression> childNodes = rexCall.getFunctionOperands();
@@ -167,9 +166,9 @@ public class CalciteRexExpressionParser {
return RequestUtils.getFunctionExpression(COUNT,
RequestUtils.getIdentifierExpression("*"));
}
if (canonicalName.equals(ARRAY_TO_MV)) {
- return toExpression(childNodes.get(0), pinotQuery);
+ return toExpression(childNodes.get(0), selectList);
}
- List<Expression> operands = convertRexNodes(childNodes, pinotQuery);
+ List<Expression> operands = convertRexNodes(childNodes, selectList);
ParserUtils.validateFunction(canonicalName, operands);
return RequestUtils.getFunctionExpression(canonicalName, operands);
}
@@ -177,15 +176,15 @@ public class CalciteRexExpressionParser {
/**
* Helper method to flatten the operands for the AND expression.
*/
- private static Expression compileAndExpression(RexExpression.FunctionCall
andNode, PinotQuery pinotQuery) {
+ private static Expression compileAndExpression(RexExpression.FunctionCall
andNode, List<Expression> selectList) {
List<Expression> operands = new ArrayList<>();
for (RexExpression childNode : andNode.getFunctionOperands()) {
if (childNode instanceof RexExpression.FunctionCall &&
((RexExpression.FunctionCall) childNode).getFunctionName()
.equals(AND)) {
- Expression childAndExpression =
compileAndExpression((RexExpression.FunctionCall) childNode, pinotQuery);
+ Expression childAndExpression =
compileAndExpression((RexExpression.FunctionCall) childNode, selectList);
operands.addAll(childAndExpression.getFunctionCall().getOperands());
} else {
- operands.add(toExpression(childNode, pinotQuery));
+ operands.add(toExpression(childNode, selectList));
}
}
return RequestUtils.getFunctionExpression(AND, operands);
@@ -194,15 +193,15 @@ public class CalciteRexExpressionParser {
/**
* Helper method to flatten the operands for the OR expression.
*/
- private static Expression compileOrExpression(RexExpression.FunctionCall
orNode, PinotQuery pinotQuery) {
+ private static Expression compileOrExpression(RexExpression.FunctionCall
orNode, List<Expression> selectList) {
List<Expression> operands = new ArrayList<>();
for (RexExpression childNode : orNode.getFunctionOperands()) {
if (childNode instanceof RexExpression.FunctionCall &&
((RexExpression.FunctionCall) childNode).getFunctionName()
.equals(OR)) {
- Expression childAndExpression =
compileOrExpression((RexExpression.FunctionCall) childNode, pinotQuery);
+ Expression childAndExpression =
compileOrExpression((RexExpression.FunctionCall) childNode, selectList);
operands.addAll(childAndExpression.getFunctionCall().getOperands());
} else {
- operands.add(toExpression(childNode, pinotQuery));
+ operands.add(toExpression(childNode, selectList));
}
}
return RequestUtils.getFunctionExpression(OR, operands);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LeafStageToPinotQuery.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LeafStageToPinotQuery.java
new file mode 100644
index 0000000000..2ea04c12ad
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LeafStageToPinotQuery.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+
+
+/**
+ * Utility to convert a leaf stage to a {@link PinotQuery}.
+ */
+public class LeafStageToPinotQuery {
+ private LeafStageToPinotQuery() {
+ }
+
+ /**
+ * Converts a leaf stage root to a {@link PinotQuery}. This method only
handles Project, Filter and TableScan nodes.
+ * Other node types are ignored since they don't impact routing.
+ *
+ * @param tableName the name of the table. Needs to be provided separately
since it needs TableCache.
+ * @param leafStageRoot the root of the leaf stage
+ * @param skipFilter whether to skip the filter in the query
+ * @return a {@link PinotQuery} representing the leaf stage
+ */
+ public static PinotQuery createPinotQueryForRouting(String tableName,
RelNode leafStageRoot, boolean skipFilter) {
+ List<RelNode> bottomToTopNodes = new ArrayList<>();
+ accumulateBottomToTop(leafStageRoot, bottomToTopNodes);
+ Preconditions.checkState(!bottomToTopNodes.isEmpty() &&
bottomToTopNodes.get(0) instanceof TableScan,
+ "Could not find table scan");
+ TableScan tableScan = (TableScan) bottomToTopNodes.get(0);
+ PinotQuery pinotQuery = initializePinotQueryForTableScan(tableName,
tableScan);
+ for (RelNode parentNode : bottomToTopNodes) {
+ if (parentNode instanceof Filter) {
+ if (!skipFilter) {
+ handleFilter((Filter) parentNode, pinotQuery);
+ }
+ } else if (parentNode instanceof Project) {
+ handleProject((Project) parentNode, pinotQuery);
+ }
+ }
+ return pinotQuery;
+ }
+
+ private static void accumulateBottomToTop(RelNode root, List<RelNode>
parentNodes) {
+ Preconditions.checkState(root.getInputs().size() <= 1,
+ "Leaf stage nodes should have at most one input, found: %s",
root.getInputs().size());
+ for (RelNode input : root.getInputs()) {
+ accumulateBottomToTop(input, parentNodes);
+ }
+ parentNodes.add(root);
+ }
+
+ private static PinotQuery initializePinotQueryForTableScan(String tableName,
TableScan tableScan) {
+ PinotQuery pinotQuery = new PinotQuery();
+ pinotQuery.setDataSource(new DataSource());
+ pinotQuery.getDataSource().setTableName(tableName);
+
pinotQuery.setSelectList(tableScan.getRowType().getFieldNames().stream().map(
+ CalciteSqlParser::compileToExpression).collect(Collectors.toList()));
+ return pinotQuery;
+ }
+
+ private static void handleProject(Project project, PinotQuery pinotQuery) {
+ if (project != null) {
+ List<RexExpression> rexExpressions =
RexExpressionUtils.fromRexNodes(project.getProjects());
+ List<Expression> selectList =
CalciteRexExpressionParser.convertRexNodes(rexExpressions,
+ pinotQuery.getSelectList());
+ pinotQuery.setSelectList(selectList);
+ }
+ }
+
+ private static void handleFilter(Filter filter, PinotQuery pinotQuery) {
+ if (filter != null) {
+ RexExpression rexExpression =
RexExpressionUtils.fromRexNode(filter.getCondition());
+
pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(rexExpression,
+ pinotQuery.getSelectList()));
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index 652376e66c..e0441a0f21 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -42,6 +43,8 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
@@ -53,6 +56,7 @@ import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.logical.LeafStageToPinotQuery;
import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
import org.apache.pinot.query.planner.physical.v2.PRelNode;
import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
@@ -87,7 +91,6 @@ import org.slf4j.LoggerFactory;
* <li>Support for look-up join.</li>
* <li>Support for partition parallelism and the colocated join hint. See F2
in #15455.</li>
* <li>Support for Hybrid Tables for automatic partitioning inference.</li>
- * <li>Server pruning based on filter predicates.</li>
* </ul>
*/
public class LeafStageWorkerAssignmentRule extends PRelOptRule {
@@ -112,7 +115,13 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
@Override
public PRelNode onMatch(PRelOptRuleCall call) {
if (call._currentNode.unwrap() instanceof TableScan) {
- return assignTableScan((PhysicalTableScan) call._currentNode,
_physicalPlannerContext.getRequestId());
+ PRelNode leafStageRoot = extractCurrentLeafStageParent(call._parents);
+ leafStageRoot = leafStageRoot == null ? call._currentNode :
leafStageRoot;
+ String tableName = getActualTableName((TableScan)
call._currentNode.unwrap());
+ PinotQuery pinotQuery =
LeafStageToPinotQuery.createPinotQueryForRouting(tableName,
leafStageRoot.unwrap(),
+
!PhysicalPlannerContext.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
+ return assignTableScan((PhysicalTableScan) call._currentNode,
_physicalPlannerContext.getRequestId(),
+ pinotQuery);
}
PRelNode currentNode = call._currentNode;
Preconditions.checkState(currentNode.isLeafStage(), "Leaf stage worker
assignment called for non-leaf stage node:"
@@ -124,11 +133,11 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
return currentNode.with(currentNode.getPRelInputs(), derivedDistribution);
}
- private PhysicalTableScan assignTableScan(PhysicalTableScan tableScan, long
requestId) {
+ private PhysicalTableScan assignTableScan(PhysicalTableScan tableScan, long
requestId, PinotQuery pinotQuery) {
// Step-1: Init tableName, table options, routing table and time boundary
info.
String tableName = Objects.requireNonNull(getActualTableName(tableScan),
"Table not found");
Map<String, String> tableOptions = getTableOptions(tableScan.getHints());
- Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName,
requestId);
+ Map<String, RoutingTable> routingTableMap = getRoutingTable(pinotQuery,
requestId);
Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find
routing entries for table: %s", tableName);
// acquire time boundary info if it is a hybrid table.
TimeBoundaryInfo timeBoundaryInfo = null;
@@ -400,6 +409,25 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
return segments;
}
+ @Nullable
+ static PRelNode extractCurrentLeafStageParent(Deque<PRelNode> parents) {
+ for (PRelNode parent : parents) {
+ if (parent.isLeafStage()) {
+ return parent;
+ }
+ }
+ return null;
+ }
+
+ static PinotQuery deepCopyWithTableType(PinotQuery pinotQuery, TableType
tableType) {
+ PinotQuery newPinotQuery = pinotQuery.deepCopy();
+ DataSource dataSource = new DataSource();
+
dataSource.setTableName(TableNameBuilder.forType(tableType).tableNameWithType(
+
TableNameBuilder.extractRawTableName(pinotQuery.getDataSource().getTableName())));
+ newPinotQuery.setDataSource(dataSource);
+ return newPinotQuery;
+ }
+
private Map<String, TablePartitionInfo> calculateTablePartitionInfo(String
tableName, Set<String> tableTypes) {
Map<String, TablePartitionInfo> result = new HashMap<>();
if (tableTypes.contains("OFFLINE")) {
@@ -422,25 +450,28 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
/**
* Acquire routing table for items listed in TableScanNode.
*
- * @param tableName table name with or without type suffix.
+ * @param pinotQuery the PinotQuery with filters, project and table-scan
information.
* @return keyed-map from table type(s) to routing table(s).
*/
- private Map<String, RoutingTable> getRoutingTable(String tableName, long
requestId) {
+ private Map<String, RoutingTable> getRoutingTable(PinotQuery pinotQuery,
long requestId) {
+ String tableName = pinotQuery.getDataSource().getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
Map<String, RoutingTable> routingTableMap = new HashMap<>();
RoutingTable routingTable;
if (tableType == null) {
- routingTable = getRoutingTable(rawTableName, TableType.OFFLINE,
requestId);
+ PinotQuery offlineTableTypeQuery = deepCopyWithTableType(pinotQuery,
TableType.OFFLINE);
+ routingTable = getRoutingTableSingleTableType(offlineTableTypeQuery,
requestId);
if (routingTable != null) {
routingTableMap.put(TableType.OFFLINE.name(), routingTable);
}
- routingTable = getRoutingTable(rawTableName, TableType.REALTIME,
requestId);
+ PinotQuery realtimeTableTypeQuery = deepCopyWithTableType(pinotQuery,
TableType.REALTIME);
+ routingTable = getRoutingTableSingleTableType(realtimeTableTypeQuery,
requestId);
if (routingTable != null) {
routingTableMap.put(TableType.REALTIME.name(), routingTable);
}
} else {
- routingTable = getRoutingTable(tableName, tableType, requestId);
+ routingTable = getRoutingTableSingleTableType(pinotQuery, requestId);
if (routingTable != null) {
routingTableMap.put(tableType.name(), routingTable);
}
@@ -448,11 +479,8 @@ public class LeafStageWorkerAssignmentRule extends
PRelOptRule {
return routingTableMap;
}
- private RoutingTable getRoutingTable(String tableName, TableType tableType,
long requestId) {
- String tableNameWithType =
-
TableNameBuilder.forType(tableType).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
- return _routingManager.getRoutingTable(
- CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" +
tableNameWithType + "\""), requestId);
+ private RoutingTable getRoutingTableSingleTableType(PinotQuery pinotQuery,
long requestId) {
+ return
_routingManager.getRoutingTable(CalciteSqlCompiler.convertToBrokerRequest(pinotQuery),
requestId);
}
private Map<String, String> getTableOptions(List<RelHint> hints) {
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
index 654a365d48..59b43a7679 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.query.planner.physical.v2.opt.rules;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,12 +30,15 @@ import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
import
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule.InstanceIdToSegments;
import
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule.TableScanWorkerAssignmentResult;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.*;
@@ -241,6 +246,24 @@ public class LeafStageWorkerAssignmentRuleTest {
List.of("s0", "s1", "s2", "s3", "s4")));
}
+ @Test
+ public void testExtractLeafStageRoot() {
+ // Create parents mock RelNodes. The first node is not part of the leaf
stage, while the last two nodes are.
+ PRelNode pRelNode1 = mock(PRelNode.class);
+ PRelNode pRelNode2 = mock(PRelNode.class);
+ PRelNode pRelNode3 = mock(PRelNode.class);
+ doReturn(false).when(pRelNode1).isLeafStage();
+ doReturn(true).when(pRelNode2).isLeafStage();
+ doReturn(true).when(pRelNode3).isLeafStage();
+ // Add nodes in order (top to bottom of tree).
+ Deque<PRelNode> parents = new ArrayDeque<>();
+ parents.addLast(pRelNode1);
+ parents.addLast(pRelNode2);
+ parents.addLast(pRelNode3);
+ PRelNode leafStageRoot =
LeafStageWorkerAssignmentRule.extractCurrentLeafStageParent(parents);
+ assertEquals(leafStageRoot, pRelNode2);
+ }
+
private static void
validateTableScanAssignment(TableScanWorkerAssignmentResult assignmentResult,
Map<String, List<String>> instanceIdToSegmentsMap, String tableType) {
Map<String, List<String>> actualInstanceIdToSegments = new HashMap<>();
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index d9ef04fa2a..7d09ed3427 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -603,5 +603,23 @@
]
}
]
+ },
+ "physical_opt_broker_pruning": {
+ "queries": [
+ {
+ "description": "Broker pruning example (smoke test)",
+ "sql": "SET usePhysicalOptimizer=true; SET useBrokerPruning=true;
EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' ORDER BY col2",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalSort(sort0=[$0], dir0=[ASC])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalProject(col2=[$1], col3=[$2])",
+ "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ }
+ ]
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index a3767fdec5..74671dcf88 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -78,7 +78,7 @@ public class ServerPlanRequestVisitor implements
PlanNodeVisitor<Void, ServerPla
pinotQuery.setGroupByList(groupByList);
}
List<Expression> selectList =
CalciteRexExpressionParser.convertAggregateList(groupByList, node.getAggCalls(),
- node.getFilterArgs(), pinotQuery);
+ node.getFilterArgs(), pinotQuery.getSelectList());
for (Expression expression : selectList) {
applyTimestampIndex(expression, pinotQuery);
}
@@ -131,7 +131,8 @@ public class ServerPlanRequestVisitor implements
PlanNodeVisitor<Void, ServerPla
if (visit(node.getInputs().get(0), context)) {
PinotQuery pinotQuery = context.getPinotQuery();
if (pinotQuery.getFilterExpression() == null) {
- Expression expression =
CalciteRexExpressionParser.toExpression(node.getCondition(), pinotQuery);
+ Expression expression =
CalciteRexExpressionParser.toExpression(node.getCondition(),
+ pinotQuery.getSelectList());
applyTimestampIndex(expression, pinotQuery);
pinotQuery.setFilterExpression(expression);
} else {
@@ -197,7 +198,8 @@ public class ServerPlanRequestVisitor implements
PlanNodeVisitor<Void, ServerPla
public Void visitProject(ProjectNode node, ServerPlanRequestContext context)
{
if (visit(node.getInputs().get(0), context)) {
PinotQuery pinotQuery = context.getPinotQuery();
- List<Expression> selectList =
CalciteRexExpressionParser.convertRexNodes(node.getProjects(), pinotQuery);
+ List<Expression> selectList =
CalciteRexExpressionParser.convertRexNodes(node.getProjects(),
+ pinotQuery.getSelectList());
for (Expression expression : selectList) {
applyTimestampIndex(expression, pinotQuery);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 5c5f294646..4d7f880d80 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -634,6 +634,11 @@ public class CommonConstants {
// name, when that segment has multiple partitions in its
columnPartitionMap.
public static final String INFER_INVALID_SEGMENT_PARTITION =
"inferInvalidSegmentPartition";
public static final String USE_LITE_MODE = "useLiteMode";
+ // Used by the MSE Engine to determine whether to use the broker
pruning logic. Only supported by the
+ // new MSE query optimizer.
+ // TODO(mse-physical): Consider removing this query option and making
this the default, since there's already
+ // a table config to enable broker pruning (it is disabled by
default).
+ public static final String USE_BROKER_PRUNING = "useBrokerPruning";
}
public static class QueryOptionValue {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]