This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2e4dff46 [multistage] Add Support for Broker Server/Segment Pruning 
(#15959)
fb2e4dff46 is described below

commit fb2e4dff467fa19e5eec1deb53380c3ad4183699
Author: Ankit Sultana <[email protected]>
AuthorDate: Wed Jun 4 18:11:34 2025 -0500

    [multistage] Add Support for Broker Server/Segment Pruning (#15959)
---
 .../query/context/PhysicalPlannerContext.java      |  11 ++-
 .../query/parser/CalciteRexExpressionParser.java   |  41 ++++----
 .../planner/logical/LeafStageToPinotQuery.java     | 105 +++++++++++++++++++++
 .../opt/rules/LeafStageWorkerAssignmentRule.java   |  56 ++++++++---
 .../rules/LeafStageWorkerAssignmentRuleTest.java   |  23 +++++
 .../resources/queries/PhysicalOptimizerPlans.json  |  18 ++++
 .../plan/server/ServerPlanRequestVisitor.java      |   8 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   5 +
 8 files changed, 227 insertions(+), 40 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 66809336e1..3cfd58aa03 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -122,6 +122,10 @@ public class PhysicalPlannerContext {
     return _useLiteMode;
   }
 
+  private QueryServerInstance getBrokerQueryServerInstance() {
+    return new QueryServerInstance(_instanceId, _hostName, _port, _port);
+  }
+
   public static boolean isUsePhysicalOptimizer(@Nullable Map<String, String> 
queryOptions) {
     if (queryOptions == null) {
       return false;
@@ -129,8 +133,11 @@ public class PhysicalPlannerContext {
     return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER,
 "false"));
   }
 
-  private QueryServerInstance getBrokerQueryServerInstance() {
-    return new QueryServerInstance(_instanceId, _hostName, _port, _port);
+  public static boolean isUseBrokerPruning(@Nullable Map<String, String> 
queryOptions) {
+    if (queryOptions == null) {
+      return false;
+    }
+    return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING,
 "false"));
   }
 
   private static boolean useLiteMode(@Nullable Map<String, String> 
queryOptions) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
index fdd19a9aef..a5e9531aad 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/CalciteRexExpressionParser.java
@@ -60,10 +60,10 @@ public class CalciteRexExpressionParser {
   // Relational conversion Utils
   // --------------------------------------------------------------------------
 
-  public static List<Expression> convertRexNodes(List<RexExpression> rexNodes, 
PinotQuery pinotQuery) {
+  public static List<Expression> convertRexNodes(List<RexExpression> rexNodes, 
List<Expression> selectList) {
     List<Expression> expressions = new ArrayList<>(rexNodes.size());
     for (RexExpression rexNode : rexNodes) {
-      expressions.add(toExpression(rexNode, pinotQuery));
+      expressions.add(toExpression(rexNode, selectList));
     }
     return expressions;
   }
@@ -78,18 +78,18 @@ public class CalciteRexExpressionParser {
   }
 
   public static List<Expression> convertAggregateList(List<Expression> 
groupByList,
-      List<RexExpression.FunctionCall> aggCalls, List<Integer> filterArgs, 
PinotQuery pinotQuery) {
+      List<RexExpression.FunctionCall> aggCalls, List<Integer> filterArgs, 
List<Expression> selectList) {
     int numAggCalls = aggCalls.size();
     List<Expression> expressions = new ArrayList<>(groupByList.size() + 
numAggCalls);
     expressions.addAll(groupByList);
     for (int i = 0; i < numAggCalls; i++) {
-      Expression aggFunction = compileFunctionExpression(aggCalls.get(i), 
pinotQuery);
+      Expression aggFunction = compileFunctionExpression(aggCalls.get(i), 
selectList);
       int filterArgIdx = filterArgs.get(i);
       if (filterArgIdx == -1) {
         expressions.add(aggFunction);
       } else {
         expressions.add(
-            RequestUtils.getFunctionExpression(FILTER, aggFunction, 
pinotQuery.getSelectList().get(filterArgIdx)));
+            RequestUtils.getFunctionExpression(FILTER, aggFunction, 
selectList.get(filterArgIdx)));
       }
     }
     return expressions;
@@ -120,19 +120,18 @@ public class CalciteRexExpressionParser {
     }
   }
 
-  public static Expression toExpression(RexExpression rexNode, PinotQuery 
pinotQuery) {
+  public static Expression toExpression(RexExpression rexNode, 
List<Expression> selectList) {
     if (rexNode instanceof RexExpression.InputRef) {
-      return inputRefToIdentifier((RexExpression.InputRef) rexNode, 
pinotQuery);
+      return inputRefToIdentifier((RexExpression.InputRef) rexNode, 
selectList);
     } else if (rexNode instanceof RexExpression.Literal) {
       return 
RequestUtils.getLiteralExpression(toLiteral((RexExpression.Literal) rexNode));
     } else {
       assert rexNode instanceof RexExpression.FunctionCall;
-      return compileFunctionExpression((RexExpression.FunctionCall) rexNode, 
pinotQuery);
+      return compileFunctionExpression((RexExpression.FunctionCall) rexNode, 
selectList);
     }
   }
 
-  private static Expression inputRefToIdentifier(RexExpression.InputRef 
inputRef, PinotQuery pinotQuery) {
-    List<Expression> selectList = pinotQuery.getSelectList();
+  private static Expression inputRefToIdentifier(RexExpression.InputRef 
inputRef, List<Expression> selectList) {
     return selectList.get(inputRef.getIndex());
   }
 
@@ -153,13 +152,13 @@ public class CalciteRexExpressionParser {
     return RequestUtils.getLiteral(value);
   }
 
-  private static Expression 
compileFunctionExpression(RexExpression.FunctionCall rexCall, PinotQuery 
pinotQuery) {
+  private static Expression 
compileFunctionExpression(RexExpression.FunctionCall rexCall, List<Expression> 
selectList) {
     String functionName = rexCall.getFunctionName();
     if (functionName.equals(AND)) {
-      return compileAndExpression(rexCall, pinotQuery);
+      return compileAndExpression(rexCall, selectList);
     }
     if (functionName.equals(OR)) {
-      return compileOrExpression(rexCall, pinotQuery);
+      return compileOrExpression(rexCall, selectList);
     }
     String canonicalName = 
RequestUtils.canonicalizeFunctionNamePreservingSpecialKey(functionName);
     List<RexExpression> childNodes = rexCall.getFunctionOperands();
@@ -167,9 +166,9 @@ public class CalciteRexExpressionParser {
       return RequestUtils.getFunctionExpression(COUNT, 
RequestUtils.getIdentifierExpression("*"));
     }
     if (canonicalName.equals(ARRAY_TO_MV)) {
-      return toExpression(childNodes.get(0), pinotQuery);
+      return toExpression(childNodes.get(0), selectList);
     }
-    List<Expression> operands = convertRexNodes(childNodes, pinotQuery);
+    List<Expression> operands = convertRexNodes(childNodes, selectList);
     ParserUtils.validateFunction(canonicalName, operands);
     return RequestUtils.getFunctionExpression(canonicalName, operands);
   }
@@ -177,15 +176,15 @@ public class CalciteRexExpressionParser {
   /**
    * Helper method to flatten the operands for the AND expression.
    */
-  private static Expression compileAndExpression(RexExpression.FunctionCall 
andNode, PinotQuery pinotQuery) {
+  private static Expression compileAndExpression(RexExpression.FunctionCall 
andNode, List<Expression> selectList) {
     List<Expression> operands = new ArrayList<>();
     for (RexExpression childNode : andNode.getFunctionOperands()) {
       if (childNode instanceof RexExpression.FunctionCall && 
((RexExpression.FunctionCall) childNode).getFunctionName()
           .equals(AND)) {
-        Expression childAndExpression = 
compileAndExpression((RexExpression.FunctionCall) childNode, pinotQuery);
+        Expression childAndExpression = 
compileAndExpression((RexExpression.FunctionCall) childNode, selectList);
         operands.addAll(childAndExpression.getFunctionCall().getOperands());
       } else {
-        operands.add(toExpression(childNode, pinotQuery));
+        operands.add(toExpression(childNode, selectList));
       }
     }
     return RequestUtils.getFunctionExpression(AND, operands);
@@ -194,15 +193,15 @@ public class CalciteRexExpressionParser {
   /**
    * Helper method to flatten the operands for the OR expression.
    */
-  private static Expression compileOrExpression(RexExpression.FunctionCall 
orNode, PinotQuery pinotQuery) {
+  private static Expression compileOrExpression(RexExpression.FunctionCall 
orNode, List<Expression> selectList) {
     List<Expression> operands = new ArrayList<>();
     for (RexExpression childNode : orNode.getFunctionOperands()) {
       if (childNode instanceof RexExpression.FunctionCall && 
((RexExpression.FunctionCall) childNode).getFunctionName()
           .equals(OR)) {
-        Expression childAndExpression = 
compileOrExpression((RexExpression.FunctionCall) childNode, pinotQuery);
+        Expression childAndExpression = 
compileOrExpression((RexExpression.FunctionCall) childNode, selectList);
         operands.addAll(childAndExpression.getFunctionCall().getOperands());
       } else {
-        operands.add(toExpression(childNode, pinotQuery));
+        operands.add(toExpression(childNode, selectList));
       }
     }
     return RequestUtils.getFunctionExpression(OR, operands);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LeafStageToPinotQuery.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LeafStageToPinotQuery.java
new file mode 100644
index 0000000000..2ea04c12ad
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/LeafStageToPinotQuery.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+
+
+/**
+ * Utility to convert a leaf stage to a {@link PinotQuery}.
+ */
+public class LeafStageToPinotQuery {
+  private LeafStageToPinotQuery() {
+  }
+
+  /**
+   * Converts a leaf stage root to a {@link PinotQuery}. This method only 
handles Project, Filter and TableScan nodes.
+   * Other node types are ignored since they don't impact routing.
+   *
+   * @param tableName the name of the table. Needs to be provided separately 
since it needs TableCache.
+   * @param leafStageRoot the root of the leaf stage
+   * @param skipFilter whether to skip the filter in the query
+   * @return a {@link PinotQuery} representing the leaf stage
+   */
+  public static PinotQuery createPinotQueryForRouting(String tableName, 
RelNode leafStageRoot, boolean skipFilter) {
+    List<RelNode> bottomToTopNodes = new ArrayList<>();
+    accumulateBottomToTop(leafStageRoot, bottomToTopNodes);
+    Preconditions.checkState(!bottomToTopNodes.isEmpty() && 
bottomToTopNodes.get(0) instanceof TableScan,
+        "Could not find table scan");
+    TableScan tableScan = (TableScan) bottomToTopNodes.get(0);
+    PinotQuery pinotQuery = initializePinotQueryForTableScan(tableName, 
tableScan);
+    for (RelNode parentNode : bottomToTopNodes) {
+      if (parentNode instanceof Filter) {
+        if (!skipFilter) {
+          handleFilter((Filter) parentNode, pinotQuery);
+        }
+      } else if (parentNode instanceof Project) {
+        handleProject((Project) parentNode, pinotQuery);
+      }
+    }
+    return pinotQuery;
+  }
+
+  private static void accumulateBottomToTop(RelNode root, List<RelNode> 
parentNodes) {
+    Preconditions.checkState(root.getInputs().size() <= 1,
+        "Leaf stage nodes should have at most one input, found: %s", 
root.getInputs().size());
+    for (RelNode input : root.getInputs()) {
+      accumulateBottomToTop(input, parentNodes);
+    }
+    parentNodes.add(root);
+  }
+
+  private static PinotQuery initializePinotQueryForTableScan(String tableName, 
TableScan tableScan) {
+    PinotQuery pinotQuery = new PinotQuery();
+    pinotQuery.setDataSource(new DataSource());
+    pinotQuery.getDataSource().setTableName(tableName);
+    
pinotQuery.setSelectList(tableScan.getRowType().getFieldNames().stream().map(
+        CalciteSqlParser::compileToExpression).collect(Collectors.toList()));
+    return pinotQuery;
+  }
+
+  private static void handleProject(Project project, PinotQuery pinotQuery) {
+    if (project != null) {
+      List<RexExpression> rexExpressions = 
RexExpressionUtils.fromRexNodes(project.getProjects());
+      List<Expression> selectList = 
CalciteRexExpressionParser.convertRexNodes(rexExpressions,
+          pinotQuery.getSelectList());
+      pinotQuery.setSelectList(selectList);
+    }
+  }
+
+  private static void handleFilter(Filter filter, PinotQuery pinotQuery) {
+    if (filter != null) {
+      RexExpression rexExpression = 
RexExpressionUtils.fromRexNode(filter.getCondition());
+      
pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(rexExpression,
+          pinotQuery.getSelectList()));
+    }
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
index 652376e66c..e0441a0f21 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -42,6 +43,8 @@ import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
@@ -53,6 +56,7 @@ import org.apache.pinot.core.routing.TablePartitionInfo;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.logical.LeafStageToPinotQuery;
 import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
 import org.apache.pinot.query.planner.physical.v2.PRelNode;
 import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
@@ -87,7 +91,6 @@ import org.slf4j.LoggerFactory;
  *   <li>Support for look-up join.</li>
  *   <li>Support for partition parallelism and the colocated join hint. See F2 
in #15455.</li>
  *   <li>Support for Hybrid Tables for automatic partitioning inference.</li>
- *   <li>Server pruning based on filter predicates.</li>
  * </ul>
  */
 public class LeafStageWorkerAssignmentRule extends PRelOptRule {
@@ -112,7 +115,13 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
   @Override
   public PRelNode onMatch(PRelOptRuleCall call) {
     if (call._currentNode.unwrap() instanceof TableScan) {
-      return assignTableScan((PhysicalTableScan) call._currentNode, 
_physicalPlannerContext.getRequestId());
+      PRelNode leafStageRoot = extractCurrentLeafStageParent(call._parents);
+      leafStageRoot = leafStageRoot == null ? call._currentNode : 
leafStageRoot;
+      String tableName = getActualTableName((TableScan) 
call._currentNode.unwrap());
+      PinotQuery pinotQuery = 
LeafStageToPinotQuery.createPinotQueryForRouting(tableName, 
leafStageRoot.unwrap(),
+          
!PhysicalPlannerContext.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions()));
+      return assignTableScan((PhysicalTableScan) call._currentNode, 
_physicalPlannerContext.getRequestId(),
+          pinotQuery);
     }
     PRelNode currentNode = call._currentNode;
     Preconditions.checkState(currentNode.isLeafStage(), "Leaf stage worker 
assignment called for non-leaf stage node:"
@@ -124,11 +133,11 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
     return currentNode.with(currentNode.getPRelInputs(), derivedDistribution);
   }
 
-  private PhysicalTableScan assignTableScan(PhysicalTableScan tableScan, long 
requestId) {
+  private PhysicalTableScan assignTableScan(PhysicalTableScan tableScan, long 
requestId, PinotQuery pinotQuery) {
     // Step-1: Init tableName, table options, routing table and time boundary 
info.
     String tableName = Objects.requireNonNull(getActualTableName(tableScan), 
"Table not found");
     Map<String, String> tableOptions = getTableOptions(tableScan.getHints());
-    Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName, 
requestId);
+    Map<String, RoutingTable> routingTableMap = getRoutingTable(pinotQuery, 
requestId);
     Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find 
routing entries for table: %s", tableName);
     // acquire time boundary info if it is a hybrid table.
     TimeBoundaryInfo timeBoundaryInfo = null;
@@ -400,6 +409,25 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
     return segments;
   }
 
+  @Nullable
+  static PRelNode extractCurrentLeafStageParent(Deque<PRelNode> parents) {
+    for (PRelNode parent : parents) {
+      if (parent.isLeafStage()) {
+        return parent;
+      }
+    }
+    return null;
+  }
+
+  static PinotQuery deepCopyWithTableType(PinotQuery pinotQuery, TableType 
tableType) {
+    PinotQuery newPinotQuery = pinotQuery.deepCopy();
+    DataSource dataSource = new DataSource();
+    
dataSource.setTableName(TableNameBuilder.forType(tableType).tableNameWithType(
+        
TableNameBuilder.extractRawTableName(pinotQuery.getDataSource().getTableName())));
+    newPinotQuery.setDataSource(dataSource);
+    return newPinotQuery;
+  }
+
   private Map<String, TablePartitionInfo> calculateTablePartitionInfo(String 
tableName, Set<String> tableTypes) {
     Map<String, TablePartitionInfo> result = new HashMap<>();
     if (tableTypes.contains("OFFLINE")) {
@@ -422,25 +450,28 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
   /**
    * Acquire routing table for items listed in TableScanNode.
    *
-   * @param tableName table name with or without type suffix.
+   * @param pinotQuery the PinotQuery with filters, project and table-scan 
information.
    * @return keyed-map from table type(s) to routing table(s).
    */
-  private Map<String, RoutingTable> getRoutingTable(String tableName, long 
requestId) {
+  private Map<String, RoutingTable> getRoutingTable(PinotQuery pinotQuery, 
long requestId) {
+    String tableName = pinotQuery.getDataSource().getTableName();
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     Map<String, RoutingTable> routingTableMap = new HashMap<>();
     RoutingTable routingTable;
     if (tableType == null) {
-      routingTable = getRoutingTable(rawTableName, TableType.OFFLINE, 
requestId);
+      PinotQuery offlineTableTypeQuery = deepCopyWithTableType(pinotQuery, 
TableType.OFFLINE);
+      routingTable = getRoutingTableSingleTableType(offlineTableTypeQuery, 
requestId);
       if (routingTable != null) {
         routingTableMap.put(TableType.OFFLINE.name(), routingTable);
       }
-      routingTable = getRoutingTable(rawTableName, TableType.REALTIME, 
requestId);
+      PinotQuery realtimeTableTypeQuery = deepCopyWithTableType(pinotQuery, 
TableType.REALTIME);
+      routingTable = getRoutingTableSingleTableType(realtimeTableTypeQuery, 
requestId);
       if (routingTable != null) {
         routingTableMap.put(TableType.REALTIME.name(), routingTable);
       }
     } else {
-      routingTable = getRoutingTable(tableName, tableType, requestId);
+      routingTable = getRoutingTableSingleTableType(pinotQuery, requestId);
       if (routingTable != null) {
         routingTableMap.put(tableType.name(), routingTable);
       }
@@ -448,11 +479,8 @@ public class LeafStageWorkerAssignmentRule extends 
PRelOptRule {
     return routingTableMap;
   }
 
-  private RoutingTable getRoutingTable(String tableName, TableType tableType, 
long requestId) {
-    String tableNameWithType =
-        
TableNameBuilder.forType(tableType).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
-    return _routingManager.getRoutingTable(
-        CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM \"" + 
tableNameWithType + "\""), requestId);
+  private RoutingTable getRoutingTableSingleTableType(PinotQuery pinotQuery, 
long requestId) {
+    return 
_routingManager.getRoutingTable(CalciteSqlCompiler.convertToBrokerRequest(pinotQuery),
 requestId);
   }
 
   private Map<String, String> getTableOptions(List<RelHint> hints) {
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
index 654a365d48..59b43a7679 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRuleTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.query.planner.physical.v2.opt.rules;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,12 +30,15 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.core.routing.TablePartitionInfo;
 import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
 import 
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule.InstanceIdToSegments;
 import 
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule.TableScanWorkerAssignmentResult;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.*;
 
 
@@ -241,6 +246,24 @@ public class LeafStageWorkerAssignmentRuleTest {
         List.of("s0", "s1", "s2", "s3", "s4")));
   }
 
+  @Test
+  public void testExtractLeafStageRoot() {
+    // Create parents mock RelNodes. The first node is not part of the leaf 
stage, while the last two nodes are.
+    PRelNode pRelNode1 = mock(PRelNode.class);
+    PRelNode pRelNode2 = mock(PRelNode.class);
+    PRelNode pRelNode3 = mock(PRelNode.class);
+    doReturn(false).when(pRelNode1).isLeafStage();
+    doReturn(true).when(pRelNode2).isLeafStage();
+    doReturn(true).when(pRelNode3).isLeafStage();
+    // Add nodes in order (top to bottom of tree).
+    Deque<PRelNode> parents = new ArrayDeque<>();
+    parents.addLast(pRelNode1);
+    parents.addLast(pRelNode2);
+    parents.addLast(pRelNode3);
+    PRelNode leafStageRoot = 
LeafStageWorkerAssignmentRule.extractCurrentLeafStageParent(parents);
+    assertEquals(leafStageRoot, pRelNode2);
+  }
+
   private static void 
validateTableScanAssignment(TableScanWorkerAssignmentResult assignmentResult,
       Map<String, List<String>> instanceIdToSegmentsMap, String tableType) {
     Map<String, List<String>> actualInstanceIdToSegments = new HashMap<>();
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index d9ef04fa2a..7d09ed3427 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -603,5 +603,23 @@
         ]
       }
     ]
+  },
+  "physical_opt_broker_pruning": {
+    "queries": [
+      {
+        "description": "Broker pruning example (smoke test)",
+        "sql": "SET usePhysicalOptimizer=true; SET useBrokerPruning=true; 
EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' ORDER BY col2",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$0], dir0=[ASC])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalProject(col2=[$1], col3=[$2])",
+          "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n          PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      }
+    ]
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index a3767fdec5..74671dcf88 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -78,7 +78,7 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
         pinotQuery.setGroupByList(groupByList);
       }
       List<Expression> selectList = 
CalciteRexExpressionParser.convertAggregateList(groupByList, node.getAggCalls(),
-          node.getFilterArgs(), pinotQuery);
+          node.getFilterArgs(), pinotQuery.getSelectList());
       for (Expression expression : selectList) {
         applyTimestampIndex(expression, pinotQuery);
       }
@@ -131,7 +131,8 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
     if (visit(node.getInputs().get(0), context)) {
       PinotQuery pinotQuery = context.getPinotQuery();
       if (pinotQuery.getFilterExpression() == null) {
-        Expression expression = 
CalciteRexExpressionParser.toExpression(node.getCondition(), pinotQuery);
+        Expression expression = 
CalciteRexExpressionParser.toExpression(node.getCondition(),
+            pinotQuery.getSelectList());
         applyTimestampIndex(expression, pinotQuery);
         pinotQuery.setFilterExpression(expression);
       } else {
@@ -197,7 +198,8 @@ public class ServerPlanRequestVisitor implements 
PlanNodeVisitor<Void, ServerPla
   public Void visitProject(ProjectNode node, ServerPlanRequestContext context) 
{
     if (visit(node.getInputs().get(0), context)) {
       PinotQuery pinotQuery = context.getPinotQuery();
-      List<Expression> selectList = 
CalciteRexExpressionParser.convertRexNodes(node.getProjects(), pinotQuery);
+      List<Expression> selectList = 
CalciteRexExpressionParser.convertRexNodes(node.getProjects(),
+          pinotQuery.getSelectList());
       for (Expression expression : selectList) {
         applyTimestampIndex(expression, pinotQuery);
       }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 5c5f294646..4d7f880d80 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -634,6 +634,11 @@ public class CommonConstants {
         // name, when that segment has multiple partitions in its 
columnPartitionMap.
         public static final String INFER_INVALID_SEGMENT_PARTITION = 
"inferInvalidSegmentPartition";
         public static final String USE_LITE_MODE = "useLiteMode";
+        // Used by the MSE Engine to determine whether to use the broker 
pruning logic. Only supported by the
+        // new MSE query optimizer.
+        // TODO(mse-physical): Consider removing this query option and making 
this the default, since there's already
+        //   a table config to enable broker pruning (it is disabled by 
default).
+        public static final String USE_BROKER_PRUNING = "useBrokerPruning";
       }
 
       public static class QueryOptionValue {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to