This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0310a354e5c [Planner] Support disabling rule by default and enabling 
it with a queryOption (#16238)
0310a354e5c is described below

commit 0310a354e5ceaa85f14322aa8c10944fdc21f742
Author: Song Fu <[email protected]>
AuthorDate: Mon Jul 7 14:16:43 2025 -0700

    [Planner] Support disabling rule by default and enabling it with a 
queryOption (#16238)
---
 .../common/utils/config/QueryOptionsUtils.java     |  18 +-
 .../calcite/rel/rules/PinotQueryRuleSets.java      |  14 +-
 .../org/apache/pinot/query/QueryEnvironment.java   |  62 +++----
 .../pinot/query/QueryPlannerRuleOptionsTest.java   | 182 +++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  19 +++
 5 files changed, 256 insertions(+), 39 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 63523f0af4b..cfca8fc37b9 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -187,12 +187,11 @@ public class QueryOptionsUtils {
     return skipIndexes;
   }
 
-  @Nullable
   public static Set<String> getSkipPlannerRules(Map<String, String> 
queryOptions) {
-    // Example config:  
skipPlannerRules='FilterIntoJoinRule,FilterAggregateTransposeRule'
+    // Example config:  
skipPlannerRules='FilterIntoJoin,FilterAggregateTranspose'
     String skipIndexesStr = 
queryOptions.get(QueryOptionKey.SKIP_PLANNER_RULES);
     if (skipIndexesStr == null) {
-      return null;
+      return Set.of();
     }
 
     String[] skippedRules = StringUtils.split(skipIndexesStr, ',');
@@ -200,6 +199,18 @@ public class QueryOptionsUtils {
     return new HashSet<>(List.of(skippedRules));
   }
 
+  public static Set<String> getUsePlannerRules(Map<String, String> 
queryOptions) {
+    // Example config:  usePlannerRules='SortJoinTranspose, 
AggregateJoinTransposeExtended'
+    String usedIndexesStr = queryOptions.get(QueryOptionKey.USE_PLANNER_RULES);
+    if (usedIndexesStr == null) {
+      return Set.of();
+    }
+
+    String[] usedRules = StringUtils.split(usedIndexesStr, ',');
+
+    return new HashSet<>(List.of(usedRules));
+  }
+
   @Nullable
   public static Boolean isUseFixedReplica(Map<String, String> queryOptions) {
     String useFixedReplica = 
queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.USE_FIXED_REPLICA);
@@ -351,6 +362,7 @@ public class QueryOptionsUtils {
     String numGroupsWarningLimit = 
queryOptions.get(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT);
     return checkedParseIntPositive(QueryOptionKey.NUM_GROUPS_WARNING_LIMIT, 
numGroupsWarningLimit);
   }
+
   @Nullable
   public static Integer getMaxInitialResultHolderCapacity(Map<String, String> 
queryOptions) {
     String maxInitialResultHolderCapacity = 
queryOptions.get(QueryOptionKey.MAX_INITIAL_RESULT_HOLDER_CAPACITY);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
index 7bc66a94db8..7607b0e0d42 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
@@ -39,6 +39,8 @@ import org.apache.calcite.rel.rules.ProjectToWindowRule;
 import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
 import org.apache.calcite.rel.rules.PruneEmptyRules;
 import org.apache.calcite.rel.rules.SemiJoinRule;
+import org.apache.calcite.rel.rules.SortJoinCopyRule;
+import org.apache.calcite.rel.rules.SortJoinTransposeRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.rel.rules.UnionToDistinctRule;
 import 
org.apache.pinot.calcite.rel.rules.PinotFilterJoinRule.PinotFilterIntoJoinRule;
@@ -48,6 +50,8 @@ import 
org.apache.pinot.spi.utils.CommonConstants.Broker.PlannerRuleNames;
 
 /**
  * Default rule sets for Pinot query
+ * Defaultly disabled rules are defined in
+ * {@link 
org.apache.pinot.spi.utils.CommonConstants.Broker#DEFAULT_DISABLED_RULES}
  */
 public class PinotQueryRuleSets {
   private PinotQueryRuleSets() {
@@ -98,7 +102,12 @@ public class PinotQueryRuleSets {
           .instanceWithDescription(PlannerRuleNames.EVALUATE_LITERAL_FILTER),
 
       // sort join rules
-      // TODO: evaluate the SORT_JOIN_TRANSPOSE and SORT_JOIN_COPY rules
+      // push sort through join for left/right outer join only, disabled by 
default
+      SortJoinTransposeRule.Config.DEFAULT
+          .withDescription(PlannerRuleNames.SORT_JOIN_TRANSPOSE).toRule(),
+      // copy sort below join without offset and limit, disabled by default
+      SortJoinCopyRule.Config.DEFAULT
+          .withDescription(PlannerRuleNames.SORT_JOIN_COPY).toRule(),
 
       // join rules
       JoinPushExpressionsRule.Config.DEFAULT
@@ -125,6 +134,9 @@ public class PinotQueryRuleSets {
       // push aggregate through join
       AggregateJoinTransposeRule.Config.DEFAULT
           .withDescription(PlannerRuleNames.AGGREGATE_JOIN_TRANSPOSE).toRule(),
+      // push aggregate functions through join, disabled by default
+      AggregateJoinTransposeRule.Config.EXTENDED
+          
.withDescription(PlannerRuleNames.AGGREGATE_JOIN_TRANSPOSE_EXTENDED).toRule(),
       // aggregate union rule
       AggregateUnionAggregateRule.Config.DEFAULT
           
.withDescription(PlannerRuleNames.AGGREGATE_UNION_AGGREGATE).toRule(),
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 05be6cd8a9e..2d30a9cdf38 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -54,7 +54,6 @@ import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.RelBuilder;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule;
@@ -151,8 +150,8 @@ public class QueryEnvironment {
         
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
     _catalogReader = new PinotCatalogReader(
         rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG, 
config.isCaseSensitive());
-    // default optProgram with no skip rule options
-    _optProgram = getOptProgram(null);
+    // default optProgram with no skip rule options and no use rule options
+    _optProgram = getOptProgram(Set.of(), Set.of());
   }
 
   public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager) {
@@ -173,9 +172,10 @@ public class QueryEnvironment {
     HepProgram optProgram = _optProgram;
     if (MapUtils.isNotEmpty(options)) {
       Set<String> skipRuleSet = QueryOptionsUtils.getSkipPlannerRules(options);
-      if (CollectionUtils.isNotEmpty(skipRuleSet)) {
+      Set<String> useRuleSet = QueryOptionsUtils.getUsePlannerRules(options);
+      if (!skipRuleSet.isEmpty() || !useRuleSet.isEmpty()) {
         // dynamically create optProgram according to rule options
-        optProgram = getOptProgram(skipRuleSet);
+        optProgram = getOptProgram(skipRuleSet, useRuleSet);
       }
     }
     boolean usePhysicalOptimizer = 
QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions(),
@@ -497,7 +497,7 @@ public class QueryEnvironment {
    * @param skipRuleSet parsed skipped rule name set from query options
    * @return HepProgram that performs logical transformations
    */
-  private static HepProgram getOptProgram(@Nullable Set<String> skipRuleSet) {
+  private static HepProgram getOptProgram(Set<String> skipRuleSet, Set<String> 
useRuleSet) {
     HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
     // Set the match order as DEPTH_FIRST. The default is arbitrary which 
works the same as DEPTH_FIRST, but it's
     // best to be explicit.
@@ -506,27 +506,17 @@ public class QueryEnvironment {
     // ----
     // Rules are disabled if its corresponding value is set to false in 
ruleFlags
     // construct filtered BASIC_RULES, FILTER_PUSHDOWN_RULES, 
PROJECT_PUSHDOWN_RULES, PRUNE_RULES
-    List<RelOptRule> basicRules;
-    List<RelOptRule> filterPushdownRules;
-    List<RelOptRule> projectPushdownRules;
-    List<RelOptRule> pruneRules;
-    if (skipRuleSet == null) {
-      basicRules = PinotQueryRuleSets.BASIC_RULES;
-      filterPushdownRules = PinotQueryRuleSets.FILTER_PUSHDOWN_RULES;
-      projectPushdownRules = PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES;
-      pruneRules = PinotQueryRuleSets.PRUNE_RULES;
-    } else {
-      basicRules = filterRuleList(PinotQueryRuleSets.BASIC_RULES, skipRuleSet);
-      filterPushdownRules = 
filterRuleList(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES, skipRuleSet);
-      projectPushdownRules = 
filterRuleList(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES, skipRuleSet);
-      pruneRules = filterRuleList(PinotQueryRuleSets.PRUNE_RULES, skipRuleSet);
-    }
-
+    List<RelOptRule> basicRules = 
filterRuleList(PinotQueryRuleSets.BASIC_RULES, skipRuleSet, useRuleSet);
+    List<RelOptRule> filterPushdownRules =
+        filterRuleList(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES, skipRuleSet, 
useRuleSet);
+    List<RelOptRule> projectPushdownRules =
+        filterRuleList(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES, skipRuleSet, 
useRuleSet);
+    List<RelOptRule> pruneRules = 
filterRuleList(PinotQueryRuleSets.PRUNE_RULES, skipRuleSet, useRuleSet);
 
     // Run the Calcite CORE rules using 1 HepInstruction per rule. We use 1 
HepInstruction per rule for simplicity:
     // the rules used here can rest assured that they are the only ones 
evaluated in a dedicated graph-traversal.
     for (RelOptRule relOptRule : basicRules) {
-        hepProgramBuilder.addRuleInstance(relOptRule);
+      hepProgramBuilder.addRuleInstance(relOptRule);
     }
 
     // ----
@@ -546,11 +536,6 @@ public class QueryEnvironment {
     return hepProgramBuilder.build();
   }
 
-  // util func to check no rules are skipped
-  private static boolean noRulesSkipped(Set<String> set) {
-    return set.isEmpty();
-  }
-
   /**
    * Filter static RuleSet according to query options
    * The filtering is done via checking query option with
@@ -560,11 +545,12 @@ public class QueryEnvironment {
    * @param skipRuleSet skip rule set from options
    * @return filtered list of rules
    */
-  private static List<RelOptRule> filterRuleList(List<RelOptRule> rules, 
Set<String> skipRuleSet) {
+  private static List<RelOptRule> filterRuleList(List<RelOptRule> rules, 
Set<String> skipRuleSet,
+      Set<String> useRuleSet) {
     List<RelOptRule> filteredRules = new ArrayList<>();
     for (RelOptRule relOptRule : rules) {
       String ruleName = relOptRule.toString();
-      if (isRuleSkipped(ruleName, skipRuleSet)) {
+      if (isRuleSkipped(ruleName, skipRuleSet, useRuleSet)) {
         continue;
       }
       filteredRules.add(relOptRule);
@@ -573,14 +559,21 @@ public class QueryEnvironment {
   }
 
   /**
-   * Whether a rule is skipped, rules not skipped by default
+   * Returns whether a rule is skipped.
+   * A rule is disabled if it is in both skipRuleSet and useRuleSet
+   *
    * @param ruleName description of the rule
    * @param skipRuleSet query skipSet
    * @return false if corresponding key is not in skipMap or the value is 
"false", else true
    */
-  private static boolean isRuleSkipped(String ruleName, Set<String> 
skipRuleSet) {
-    // can put rule-specific default behavior here
-    return skipRuleSet.contains(ruleName);
+  private static boolean isRuleSkipped(String ruleName, Set<String> 
skipRuleSet, Set<String> useRuleSet) {
+    if (skipRuleSet.contains(ruleName)) {
+      return true;
+    }
+    if (CommonConstants.Broker.DEFAULT_DISABLED_RULES.contains(ruleName)) {
+      return !useRuleSet.contains(ruleName);
+    }
+    return false;
   }
 
   private static HepProgram getTraitProgram(@Nullable WorkerManager 
workerManager, Config config,
@@ -687,7 +680,6 @@ public class QueryEnvironment {
       return 
CommonConstants.Broker.DEFAULT_USE_LEAF_SERVER_FOR_INTERMEDIATE_STAGE;
     }
 
-
     @Value.Default
     default boolean defaultEnableGroupTrim() {
       return CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM;
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
index 2369cb84ee7..5b0c00a0816 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
@@ -50,6 +50,22 @@ public class QueryPlannerRuleOptionsTest extends 
QueryEnvironmentTestBase {
         .getExplainPlan();
   }
 
+  private String explainQueryWithRuleEnabled(String query, String 
ruleToEnable) {
+    SqlNode sqlNode = 
CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
+    Map<String, String> options = new HashMap<>();
+    // disable rule
+    
options.put(CommonConstants.Broker.Request.QueryOptionKey.USE_PLANNER_RULES, 
ruleToEnable);
+    SqlNodeAndOptions sqlNodeAndOptions =
+        new SqlNodeAndOptions(
+            sqlNode,
+            PinotSqlType.DQL,
+            QueryOptionsUtils.resolveCaseInsensitiveOptions(options));
+    return _queryEnvironment
+        .compile(query, sqlNodeAndOptions)
+        .explain(RANDOM_REQUEST_ID_GEN.nextLong(), null)
+        .getExplainPlan();
+  }
+
   @Test
   public void testDisableCaseToFilter() {
     // Tests that when skipAggregateCaseToFilterRule=true,
@@ -288,4 +304,170 @@ public class QueryPlannerRuleOptionsTest extends 
QueryEnvironmentTestBase {
             + "        PinotLogicalTableScan(table=[[default, b]])\n");
     //@formatter:on
   }
+
+  @Test
+  public void testAggregateJoinTransposeExtendedDisabledByDefault() {
+    // test aggregate function pushdown is disabled by default
+    String query = "EXPLAIN PLAN FOR \n"
+        + "SELECT SUM(b.col2)\n"
+        + "FROM a INNER JOIN b\n"
+        + "ON a.col1 = b.col1\n"
+        + "GROUP BY a.col1, b.col1\n";
+
+    String explain = _queryEnvironment.explainQuery(query, 
RANDOM_REQUEST_ID_GEN.nextLong());
+    //@formatter:off
+    assertEquals(explain,
+        "Execution Plan\n"
+            + "LogicalProject(EXPR$0=[$2])\n"
+            + "  PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], 
aggType=[FINAL])\n"
+            + "    PinotLogicalExchange(distribution=[hash[0, 1]])\n"
+            + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[$SUM0($2)], 
aggType=[LEAF])\n"
+            + "        LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n"
+            + "          PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "            LogicalProject(col1=[$0])\n"
+            + "              PinotLogicalTableScan(table=[[default, a]])\n"
+            + "          PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "            LogicalProject(col1=[$0], 
$f2=[CAST($1):DECIMAL(2000, 1000) NOT NULL])\n"
+            + "              PinotLogicalTableScan(table=[[default, b]])\n");
+    //@formatter:on
+  }
+
+  @Test
+  public void testEnableAggregateJoinTransposeExtended() {
+    // test aggregate function pushdown is disabled by default
+    String query = "EXPLAIN PLAN FOR \n"
+        + "SELECT SUM(b.col2)\n"
+        + "FROM a INNER JOIN b\n"
+        + "ON a.col1 = b.col1\n"
+        + "GROUP BY a.col1, b.col1\n";
+
+    String explain = explainQueryWithRuleEnabled(query, 
PlannerRuleNames.AGGREGATE_JOIN_TRANSPOSE_EXTENDED);
+    //@formatter:off
+    assertEquals(explain,
+        "Execution Plan\n"
+            + "LogicalProject(EXPR$0=[CAST(*($1, $3)):DECIMAL(2000, 1000) NOT 
NULL])\n"
+            + "  LogicalJoin(condition=[=($0, $2)], joinType=[inner])\n"
+            + "    PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "      PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])\n"
+            + "        PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "          PinotLogicalAggregate(group=[{0}], agg#0=[COUNT()], 
aggType=[LEAF])\n"
+            + "            PinotLogicalTableScan(table=[[default, a]])\n"
+            + "    PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "      PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[FINAL])\n"
+            + "        PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "          PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[LEAF])\n"
+            + "            LogicalProject(col1=[$0], 
$f2=[CAST($1):DECIMAL(2000, 1000) NOT NULL])\n"
+            + "              PinotLogicalTableScan(table=[[default, b]])\n");
+    //@formatter:on
+  }
+
+  @Test
+  public void testSortJoinTransposeDisabledByDefault() {
+    // test aggregate function pushdown is disabled by default
+    String query = "EXPLAIN PLAN FOR \n"
+        + "SELECT a.col1, b.col1\n"
+        + "FROM a LEFT JOIN b\n"
+        + "ON a.col1 = b.col1\n"
+        + "ORDER BY a.col1\n";
+
+    String explain = _queryEnvironment.explainQuery(query, 
RANDOM_REQUEST_ID_GEN.nextLong());
+    //@formatter:off
+    assertEquals(explain,
+        "Execution Plan\n"
+            + "LogicalSort(sort0=[$0], dir0=[ASC])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], "
+            + "isSortOnSender=[false], isSortOnReceiver=[true])\n"
+            + "    LogicalJoin(condition=[=($0, $1)], joinType=[left])\n"
+            + "      PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "        LogicalProject(col1=[$0])\n"
+            + "          PinotLogicalTableScan(table=[[default, a]])\n"
+            + "      PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "        LogicalProject(col1=[$0])\n"
+            + "          PinotLogicalTableScan(table=[[default, b]])\n");
+    //@formatter:on
+  }
+
+  @Test
+  public void testEnableSortJoinTranspose() {
+    // test aggregate function pushdown is disabled by default
+    String query = "EXPLAIN PLAN FOR \n"
+        + "SELECT a.col1, b.col1\n"
+        + "FROM a LEFT JOIN b\n"
+        + "ON a.col1 = b.col1\n"
+        + "ORDER BY a.col1\n";
+
+    String explain = explainQueryWithRuleEnabled(query, 
PlannerRuleNames.SORT_JOIN_TRANSPOSE);
+    //@formatter:off
+    assertEquals(explain,
+        "Execution Plan\n"
+            + "LogicalSort(sort0=[$0], dir0=[ASC])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], "
+            + "isSortOnSender=[false], isSortOnReceiver=[true])\n"
+            + "    LogicalJoin(condition=[=($0, $1)], joinType=[left])\n"
+            + "      PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "        LogicalSort(sort0=[$0], dir0=[ASC])\n"
+            + "          PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], "
+            + "isSortOnSender=[false], isSortOnReceiver=[true])\n"
+            + "            LogicalProject(col1=[$0])\n"
+            + "              PinotLogicalTableScan(table=[[default, a]])\n"
+            + "      PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "        LogicalProject(col1=[$0])\n"
+            + "          PinotLogicalTableScan(table=[[default, b]])\n");
+    //@formatter:on
+  }
+
+  @Test
+  public void testSortJoinCopyDisabledByDefault() {
+    // test aggregate function pushdown is disabled by default
+    String query = "EXPLAIN PLAN FOR \n"
+        + "SELECT a.col1, b.col1\n"
+        + "FROM a INNER JOIN b\n"
+        + "ON a.col1 = b.col1\n"
+        + "ORDER BY a.col1\n";
+
+    String explain = _queryEnvironment.explainQuery(query, 
RANDOM_REQUEST_ID_GEN.nextLong());
+    //@formatter:off
+    assertEquals(explain,
+        "Execution Plan\n"
+            + "LogicalSort(sort0=[$0], dir0=[ASC])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], "
+            + "isSortOnSender=[false], isSortOnReceiver=[true])\n"
+            + "    LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n"
+            + "      PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "        LogicalProject(col1=[$0])\n"
+            + "          PinotLogicalTableScan(table=[[default, a]])\n"
+            + "      PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "        LogicalProject(col1=[$0])\n"
+            + "          PinotLogicalTableScan(table=[[default, b]])\n");
+    //@formatter:on
+  }
+
+  @Test
+  public void testEnableSortJoinCopy() {
+    // test aggregate function pushdown is disabled by default
+    String query = "EXPLAIN PLAN FOR \n"
+        + "SELECT a.col1, b.col1\n"
+        + "FROM a INNER JOIN b\n"
+        + "ON a.col1 = b.col1\n"
+        + "ORDER BY a.col1\n";
+
+    String explain = explainQueryWithRuleEnabled(query, 
PlannerRuleNames.SORT_JOIN_COPY);
+    //@formatter:off
+    assertEquals(explain,
+        "Execution Plan\n"
+            + "LogicalSort(sort0=[$0], dir0=[ASC])\n"
+            + "  PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], "
+            + "isSortOnSender=[false], isSortOnReceiver=[true])\n"
+            + "    LogicalJoin(condition=[=($0, $1)], joinType=[inner])\n"
+            + "      PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "        LogicalSort(sort0=[$0], dir0=[ASC])\n"
+            + "          PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], "
+            + "isSortOnSender=[false], isSortOnReceiver=[true])\n"
+            + "            LogicalProject(col1=[$0])\n"
+            + "              PinotLogicalTableScan(table=[[default, a]])\n"
+            + "      PinotLogicalExchange(distribution=[hash[0]])\n"
+            + "        LogicalProject(col1=[$0])\n"
+            + "          PinotLogicalTableScan(table=[[default, b]])\n");
+    //@formatter:on
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index d6de4b9ba22..e21468602cc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -635,6 +635,9 @@ public class CommonConstants {
         // Query option key used to skip a given set of rules
         public static final String SKIP_PLANNER_RULES = "skipPlannerRules";
 
+        // Query option key used to enable a given set of defaultly disabled 
rules
+        public static final String USE_PLANNER_RULES = "usePlannerRules";
+
         public static final String ORDER_BY_ALGORITHM = "orderByAlgorithm";
 
         public static final String MULTI_STAGE_LEAF_LIMIT = 
"multiStageLeafLimit";
@@ -782,6 +785,8 @@ public class CommonConstants {
       public static final String AGGREGATE_PROJECT_MERGE = 
"AggregateProjectMerge";
       public static final String FILTER_MERGE = "FilterMerge";
       public static final String SORT_REMOVE = "SortRemove";
+      public static final String SORT_JOIN_TRANSPOSE = "SortJoinTranspose";
+      public static final String SORT_JOIN_COPY = "SortJoinCopy";
       public static final String AGGREGATE_JOIN_TRANSPOSE_EXTENDED = 
"AggregateJoinTransposeExtended";
       public static final String PRUNE_EMPTY_AGGREGATE = "PruneEmptyAggregate";
       public static final String PRUNE_EMPTY_FILTER = "PruneEmptyFilter";
@@ -794,6 +799,20 @@ public class CommonConstants {
       public static final String PRUNE_EMPTY_JOIN_RIGHT = 
"PruneEmptyJoinRight";
     }
 
+    /**
+     * Set of planner rules that will be disabled by default
+     * and could be enabled by setting
+     * {@link CommonConstants.Broker.Request.QueryOptionKey#USE_PLANNER_RULES}.
+     *
+     * If a rule is enabled and disabled at the same time,
+     * it will be disabled
+     */
+    public static final Set<String> DEFAULT_DISABLED_RULES = Set.of(
+        PlannerRuleNames.AGGREGATE_JOIN_TRANSPOSE_EXTENDED,
+        PlannerRuleNames.SORT_JOIN_TRANSPOSE,
+        PlannerRuleNames.SORT_JOIN_COPY
+    );
+
     public static class FailureDetector {
       public enum Type {
         // Do not detect any failure


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to