This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d1817efff7 [multistage][bugfix] improve sort copy rule (#12237)
d1817efff7 is described below

commit d1817efff7998e736d8059b8343c84b1066bf2d8
Author: Rong Rong <[email protected]>
AuthorDate: Mon Jan 8 16:14:11 2024 -0800

    [multistage][bugfix] improve sort copy rule (#12237)
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../tests/OfflineClusterIntegrationTest.java       |  13 +-
 .../rel/rules/PinotSortExchangeCopyRule.java       |   5 +
 .../rel/rules/PinotSortExchangeCopyRuleTest.java   |  12 +-
 .../src/test/resources/queries/JoinPlans.json      |  38 +-
 .../src/test/resources/queries/OrderByPlans.json   |  69 +--
 .../test/resources/queries/PinotHintablePlans.json |  78 ++--
 .../resources/queries/WindowFunctionPlans.json     | 472 ++++++++++-----------
 7 files changed, 323 insertions(+), 364 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 86bafbd2f0..edcf831265 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2882,15 +2882,14 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(response1, 
"{\"dataSchema\":{\"columnNames\":[\"SQL\",\"PLAN\"],\"columnDataTypes\":[\"STRING\","
         + "\"STRING\"]},\"rows\":[[\"EXPLAIN PLAN FOR SELECT count(*) AS 
count, Carrier AS name FROM mytable "
         + "GROUP BY name ORDER BY 1\",\"Execution Plan\\n"
-        + "LogicalSort(sort0=[$0], dir0=[ASC], offset=[0])\\n"
+        + "LogicalSort(sort0=[$0], dir0=[ASC])\\n"
         + "  PinotLogicalSortExchange("
         + "distribution=[hash], collation=[[0]], isSortOnSender=[false], 
isSortOnReceiver=[true])\\n"
-        + "    LogicalSort(sort0=[$0], dir0=[ASC])\\n"
-        + "      LogicalProject(count=[$1], name=[$0])\\n"
-        + "        LogicalAggregate(group=[{0}], agg#0=[COUNT($1)])\\n"
-        + "          PinotLogicalExchange(distribution=[hash[0]])\\n"
-        + "            LogicalAggregate(group=[{17}], agg#0=[COUNT()])\\n"
-        + "              LogicalTableScan(table=[[mytable]])\\n"
+        + "    LogicalProject(count=[$1], name=[$0])\\n"
+        + "      LogicalAggregate(group=[{0}], agg#0=[COUNT($1)])\\n"
+        + "        PinotLogicalExchange(distribution=[hash[0]])\\n"
+        + "          LogicalAggregate(group=[{17}], agg#0=[COUNT()])\\n"
+        + "            LogicalTableScan(table=[[mytable]])\\n"
         + "\"]]}");
 
     // In the query below, FlightNum column has an inverted index and there is 
no data satisfying the predicate
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
index 4f93f78efd..891db2d1ec 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRule.java
@@ -42,6 +42,7 @@ public class PinotSortExchangeCopyRule extends 
RelRule<RelRule.Config> {
 
   public static final PinotSortExchangeCopyRule SORT_EXCHANGE_COPY =
       PinotSortExchangeCopyRule.Config.DEFAULT.toRule();
+  private static final int DEFAULT_SORT_EXCHANGE_COPY_THRESHOLD = 10_000;
   private static final TypeFactory TYPE_FACTORY = new TypeFactory(new 
TypeSystem());
   private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
   private static final RexLiteral REX_ZERO = REX_BUILDER.makeLiteral(0,
@@ -87,6 +88,10 @@ public class PinotSortExchangeCopyRule extends 
RelRule<RelRule.Config> {
       int total = RexExpressionUtils.getValueAsInt(sort.fetch) + 
RexExpressionUtils.getValueAsInt(sort.offset);
       fetch = REX_BUILDER.makeLiteral(total, 
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
     }
+    // do not transform sort-exchange copy when there's no fetch limit, or 
fetch amount is larger than threshold
+    if (fetch == null || RexExpressionUtils.getValueAsInt(fetch) > 
DEFAULT_SORT_EXCHANGE_COPY_THRESHOLD) {
+      return;
+    }
 
     final RelNode newExchangeInput = sort.copy(sort.getTraitSet(), 
exchange.getInput(), collation, null, fetch);
     final RelNode exchangeCopy = exchange.copy(exchange.getTraitSet(), 
newExchangeInput, exchange.getDistribution());
diff --git 
a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
 
b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
index 0b9501c7cc..241b461605 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java
@@ -231,17 +231,7 @@ public class PinotSortExchangeCopyRuleTest {
 
     // Then:
     ArgumentCaptor<RelNode> sortCopyCapture = 
ArgumentCaptor.forClass(LogicalSort.class);
-    Mockito.verify(_call, 
Mockito.times(1)).transformTo(sortCopyCapture.capture());
-
-    RelNode sortCopy = sortCopyCapture.getValue();
-    Assert.assertTrue(sortCopy instanceof LogicalSort);
-    Assert.assertTrue(((LogicalSort) sortCopy).getInput() instanceof 
PinotLogicalSortExchange);
-    Assert.assertTrue(((LogicalSort) sortCopy).getInput().getInput(0) 
instanceof LogicalSort);
-
-    LogicalSort innerSort = (LogicalSort) ((LogicalSort) 
sortCopy).getInput().getInput(0);
-    Assert.assertEquals(innerSort.getCollation(), collation);
-    Assert.assertNull((innerSort).offset);
-    Assert.assertNull((innerSort).fetch);
+    Mockito.verify(_call, 
Mockito.never()).transformTo(sortCopyCapture.capture());
   }
 
   @Test
diff --git a/pinot-query-planner/src/test/resources/queries/JoinPlans.json 
b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
index d9eb08b2ec..26cfc6bea3 100644
--- a/pinot-query-planner/src/test/resources/queries/JoinPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/JoinPlans.json
@@ -6,17 +6,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON 
a.col1 = b.col2 ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], ts=[$1], col3=[$3])",
-          "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col1=[$0], ts=[$6])",
-          "\n              LogicalTableScan(table=[[a]])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[b]])",
+          "\n    LogicalProject(col1=[$0], ts=[$1], col3=[$3])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$0], ts=[$6])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[b]])",
           "\n"
         ]
       },
@@ -25,17 +24,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, a.ts AS ts1, b.col3 
FROM a JOIN b ON a.col1 = b.col2 ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(value1=[$0], ts1=[$1], col3=[$3])",
-          "\n        LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col1=[$0], ts=[$6])",
-          "\n              LogicalTableScan(table=[[a]])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[b]])",
+          "\n    LogicalProject(value1=[$0], ts1=[$1], col3=[$3])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$0], ts=[$6])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[b]])",
           "\n"
         ]
       },
diff --git a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json 
b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
index 6c53556c6e..153f587feb 100644
--- a/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/OrderByPlans.json
@@ -6,11 +6,10 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1 FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0])",
-          "\n        LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0])",
+          "\n      LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -19,11 +18,10 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1 FROM a ORDER BY 
a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(value1=[$0])",
-          "\n        LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(value1=[$0])",
+          "\n      LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -51,18 +49,28 @@
           "\n"
         ]
       },
+      {
+        "description": "Select * order by on 2 columns with super large limit",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM b ORDER BY col1, col2 DESC 
LIMIT 10000000",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC], 
fetch=[10000000])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0, 1 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalTableScan(table=[[b]])",
+          "\n"
+        ]
+      },
       {
         "description": "Order by and group by",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a GROUP BY 
a.col1 ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
-          "\n        PinotLogicalExchange(distribution=[hash[0]])",
-          "\n          LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
-          "\n            LogicalTableScan(table=[[a]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
+          "\n        LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
+          "\n          LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -71,13 +79,12 @@
         "sql": "EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_skip_leaf_stage_group_by='true') */ a.col1, SUM(a.col3) FROM a 
GROUP BY a.col1 ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n        PinotLogicalExchange(distribution=[hash[0]])",
-          "\n          LogicalProject(col1=[$0], col3=[$2])",
-          "\n            LogicalTableScan(table=[[a]])",
+          "\n    LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
+          "\n        LogicalProject(col1=[$0], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -86,13 +93,12 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, SUM(a.col3) AS sum 
FROM a GROUP BY a.col1 ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
-          "\n        PinotLogicalExchange(distribution=[hash[0]])",
-          "\n          LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
-          "\n            LogicalTableScan(table=[[a]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
+          "\n        LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
+          "\n          LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -101,13 +107,12 @@
         "sql": "EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_skip_leaf_stage_group_by='true') */ a.col1 AS value1, SUM(a.col3) 
AS sum FROM a GROUP BY a.col1 ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
-          "\n        PinotLogicalExchange(distribution=[hash[0]])",
-          "\n          LogicalProject(col1=[$0], col3=[$2])",
-          "\n            LogicalTableScan(table=[[a]])",
+          "\n    LogicalAggregate(group=[{0}], sum=[$SUM0($1)])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
+          "\n        LogicalProject(col1=[$0], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
           "\n"
         ]
       }
diff --git 
a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json 
b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
index 7e9ff074d1..a16765e558 100644
--- a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
@@ -389,19 +389,18 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) FROM a /*+ 
tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1 ORDER BY 2 DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
-          "\n      LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
-          "\n        PinotLogicalExchange(distribution=[hash[0]])",
-          "\n          LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
-          "\n            LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
-          "\n              LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n                LogicalTableScan(table=[[a]])",
-          "\n              PinotLogicalExchange(distribution=[broadcast], 
relExchangeType=[PIPELINE_BREAKER])",
-          "\n                LogicalProject(col1=[$0])",
-          "\n                  LogicalFilter(condition=[>($2, 0)])",
-          "\n                    LogicalTableScan(table=[[b]])",
+          "\n    LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
+          "\n        LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)])",
+          "\n          LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
+          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n            PinotLogicalExchange(distribution=[broadcast], 
relExchangeType=[PIPELINE_BREAKER])",
+          "\n              LogicalProject(col1=[$0])",
+          "\n                LogicalFilter(condition=[>($2, 0)])",
+          "\n                  LogicalTableScan(table=[[b]])",
           "\n"
         ]
       },
@@ -460,17 +459,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_partitioned_by_group_by_keys='true') */ a.col2, SUM(a.col3) FROM 
a /*+ tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */ WHERE b.col3 > 0) GROUP BY 1 ORDER BY 2 DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
-          "\n      LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
-          "\n        LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
-          "\n          LogicalProject(col2=[$1], col3=[$2])",
-          "\n            LogicalTableScan(table=[[a]])",
-          "\n          PinotLogicalExchange(distribution=[broadcast], 
relExchangeType=[PIPELINE_BREAKER])",
-          "\n            LogicalProject(col1=[$0])",
-          "\n              LogicalFilter(condition=[>($2, 0)])",
-          "\n                LogicalTableScan(table=[[b]])",
+          "\n    LogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
+          "\n      LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n        LogicalProject(col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], 
relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col1=[$0])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
           "\n"
         ]
       },
@@ -479,16 +477,15 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col2 FROM a /*+ 
tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */ WHERE b.col3 > 0) ORDER BY 1 DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[DESC])",
-          "\n      LogicalJoin(condition=[=($0, $1)], joinType=[semi])",
-          "\n        LogicalProject(col2=[$1])",
-          "\n          LogicalTableScan(table=[[a]])",
-          "\n        PinotLogicalExchange(distribution=[broadcast], 
relExchangeType=[PIPELINE_BREAKER])",
-          "\n          LogicalProject(col1=[$0])",
-          "\n            LogicalFilter(condition=[>($2, 0)])",
-          "\n              LogicalTableScan(table=[[b]])",
+          "\n    LogicalJoin(condition=[=($0, $1)], joinType=[semi])",
+          "\n      LogicalProject(col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n      PinotLogicalExchange(distribution=[broadcast], 
relExchangeType=[PIPELINE_BREAKER])",
+          "\n        LogicalProject(col1=[$0])",
+          "\n          LogicalFilter(condition=[>($2, 0)])",
+          "\n            LogicalTableScan(table=[[b]])",
           "\n"
         ]
       },
@@ -497,17 +494,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col3 FROM a /*+ 
tableOptions(partition_function='hashcode', partition_key='col2', 
partition_size='4') */ WHERE a.col2 IN (SELECT col1 FROM b /*+ 
tableOptions(partition_function='hashcode', partition_key='col1', 
partition_size='4') */ WHERE b.col3 > 0) ORDER BY 2 DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
-          "\n      LogicalProject(col1=[$0], col3=[$2])",
-          "\n        LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
-          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n            LogicalTableScan(table=[[a]])",
-          "\n          PinotLogicalExchange(distribution=[broadcast], 
relExchangeType=[PIPELINE_BREAKER])",
-          "\n            LogicalProject(col1=[$0])",
-          "\n              LogicalFilter(condition=[>($2, 0)])",
-          "\n                LogicalTableScan(table=[[b]])",
+          "\n    LogicalProject(col1=[$0], col3=[$2])",
+          "\n      LogicalJoin(condition=[=($1, $3)], joinType=[semi])",
+          "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n        PinotLogicalExchange(distribution=[broadcast], 
relExchangeType=[PIPELINE_BREAKER])",
+          "\n          LogicalProject(col1=[$0])",
+          "\n            LogicalFilter(condition=[>($2, 0)])",
+          "\n              LogicalTableScan(table=[[b]])",
           "\n"
         ]
       }
diff --git 
a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json 
b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index 076868fedb..3b621c386b 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -112,14 +112,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a 
ORDER BY a.col2",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], col2=[$1])",
-          "\n        LogicalWindow(window#0=[window(aggs [SUM($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], col2=[$1])",
+          "\n      LogicalWindow(window#0=[window(aggs [SUM($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -128,14 +127,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER() FROM a 
ORDER BY a.col2",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
-          "\n        LogicalWindow(window#0=[window( rows between UNBOUNDED 
PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          PinotLogicalExchange(distribution=[hash])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
+          "\n      LogicalWindow(window#0=[window( rows between UNBOUNDED 
PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n        PinotLogicalExchange(distribution=[hash])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -291,17 +289,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3), AVG(a.col3) OVER() FROM a 
GROUP BY a.col3 ORDER BY a.col3",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT 
NULL, $3)], col3=[$0])",
-          "\n        LogicalWindow(window#0=[window(aggs [SUM($0), 
COUNT($0)])])",
-          "\n          PinotLogicalExchange(distribution=[hash])",
-          "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE 
NOT NULL, $2)])",
-          "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
-          "\n                PinotLogicalExchange(distribution=[hash[0]])",
-          "\n                  LogicalAggregate(group=[{2}], 
agg#0=[$SUM0($2)], agg#1=[COUNT()])",
-          "\n                    LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT 
NULL, $3)], col3=[$0])",
+          "\n      LogicalWindow(window#0=[window(aggs [SUM($0), 
COUNT($0)])])",
+          "\n        PinotLogicalExchange(distribution=[hash])",
+          "\n          LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT 
NULL, $2)])",
+          "\n            LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
+          "\n              PinotLogicalExchange(distribution=[hash[0]])",
+          "\n                LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], 
agg#1=[COUNT()])",
+          "\n                  LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -310,17 +307,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3), ROW_NUMBER() OVER() FROM 
a GROUP BY a.col3 ORDER BY a.col3",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[$2], col3=[$0])",
-          "\n        LogicalWindow(window#0=[window( rows between UNBOUNDED 
PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          PinotLogicalExchange(distribution=[hash])",
-          "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE 
NOT NULL, $2)])",
-          "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
-          "\n                PinotLogicalExchange(distribution=[hash[0]])",
-          "\n                  LogicalAggregate(group=[{2}], 
agg#0=[$SUM0($2)], agg#1=[COUNT()])",
-          "\n                    LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(EXPR$0=[$1], EXPR$1=[$2], col3=[$0])",
+          "\n      LogicalWindow(window#0=[window( rows between UNBOUNDED 
PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n        PinotLogicalExchange(distribution=[hash])",
+          "\n          LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT 
NULL, $2)])",
+          "\n            LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
+          "\n              PinotLogicalExchange(distribution=[hash[0]])",
+          "\n                LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], 
agg#1=[COUNT()])",
+          "\n                  LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -383,14 +379,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(), 
COUNT(a.col2) OVER() FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
-          "\n        LogicalWindow(window#0=[window(aggs [SUM($2), 
COUNT($1)])])",
-          "\n          PinotLogicalExchange(distribution=[hash])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
+          "\n      LogicalWindow(window#0=[window(aggs [SUM($2), 
COUNT($1)])])",
+          "\n        PinotLogicalExchange(distribution=[hash])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -548,17 +543,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3), AVG(a.col3) OVER(), 
SUM(a.col3) OVER() FROM a GROUP BY a.col3 ORDER BY a.col3",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$3], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
-          "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT 
NULL, $3)], EXPR$2=[$2], col3=[$0])",
-          "\n        LogicalWindow(window#0=[window(aggs [SUM($0), 
COUNT($0)])])",
-          "\n          PinotLogicalExchange(distribution=[hash])",
-          "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE 
NOT NULL, $2)])",
-          "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
-          "\n                PinotLogicalExchange(distribution=[hash[0]])",
-          "\n                  LogicalAggregate(group=[{2}], 
agg#0=[$SUM0($2)], agg#1=[COUNT()])",
-          "\n                    LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT 
NULL, $3)], EXPR$2=[$2], col3=[$0])",
+          "\n      LogicalWindow(window#0=[window(aggs [SUM($0), 
COUNT($0)])])",
+          "\n        PinotLogicalExchange(distribution=[hash])",
+          "\n          LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT 
NULL, $2)])",
+          "\n            LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
+          "\n              PinotLogicalExchange(distribution=[hash[0]])",
+          "\n                LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], 
agg#1=[COUNT()])",
+          "\n                  LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -661,14 +655,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(PARTITION BY 
a.col1) FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
-          "\n        LogicalWindow(window#0=[window(partition {0} aggs 
[MIN($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
+          "\n      LogicalWindow(window#0=[window(partition {0} aggs 
[MIN($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -881,17 +874,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3), AVG(a.col3) 
OVER(PARTITION BY a.col3) FROM a GROUP BY a.col3 ORDER BY a.col3",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT 
NULL, $3)], col3=[$0])",
-          "\n        LogicalWindow(window#0=[window(partition {0} aggs 
[SUM($0), COUNT($0)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE 
NOT NULL, $2)])",
-          "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
-          "\n                PinotLogicalExchange(distribution=[hash[0]])",
-          "\n                  LogicalAggregate(group=[{2}], 
agg#0=[$SUM0($2)], agg#1=[COUNT()])",
-          "\n                    LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT 
NULL, $3)], col3=[$0])",
+          "\n      LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), 
COUNT($0)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT 
NULL, $2)])",
+          "\n            LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
+          "\n              PinotLogicalExchange(distribution=[hash[0]])",
+          "\n                LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], 
agg#1=[COUNT()])",
+          "\n                  LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -967,14 +959,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY 
a.col2, a.col1), MAX(a.col3) OVER(PARTITION BY a.col2, a.col1) FROM a ORDER BY 
a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), MAX($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), MAX($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1015,14 +1006,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY 
a.col2, a.col1), MAX(a.col3) OVER(PARTITION BY a.col1, a.col2) FROM a ORDER BY 
a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), MAX($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$3], $2=[$4])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), MAX($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1031,14 +1021,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY 
a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, a.col1) FROM a ORDER BY 
a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), COUNT($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), COUNT($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1048,14 +1037,13 @@
         "notes": "ROW_NUMBER requires ROWS as the default frame, and the 
default frame cannot be overridden, thus it cannot be combined with other 
functions yet",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], $1=[$2], $2=[$3])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER(), 
ROW_NUMBER()])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$2], $2=[$3])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER(), 
ROW_NUMBER()])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1064,14 +1052,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY 
a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col1, a.col2) FROM a ORDER BY 
a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($4):DOUBLE NOT NULL, $5)])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), SUM($2), COUNT($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($4):DOUBLE NOT NULL, $5)])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), SUM($2), COUNT($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1080,14 +1067,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY 
a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, a.col1) FROM a ORDER BY 
a.col2",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$3], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), COUNT($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} aggs 
[SUM($2), COUNT($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1211,17 +1197,16 @@
         "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3), AVG(a.col3) 
OVER(PARTITION BY a.col3), MAX(a.col3) OVER(PARTITION BY a.col3) FROM a GROUP 
BY a.col3 ORDER BY a.col3",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$3], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$3], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$3], dir0=[ASC])",
-          "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT 
NULL, $3)], EXPR$2=[$4], col3=[$0])",
-          "\n        LogicalWindow(window#0=[window(partition {0} aggs 
[SUM($0), COUNT($0), MAX($0)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE 
NOT NULL, $2)])",
-          "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
-          "\n                PinotLogicalExchange(distribution=[hash[0]])",
-          "\n                  LogicalAggregate(group=[{2}], 
agg#0=[$SUM0($2)], agg#1=[COUNT()])",
-          "\n                    LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(EXPR$0=[$1], EXPR$1=[/(CAST($2):DOUBLE NOT 
NULL, $3)], EXPR$2=[$4], col3=[$0])",
+          "\n      LogicalWindow(window#0=[window(partition {0} aggs [SUM($0), 
COUNT($0), MAX($0)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE NOT 
NULL, $2)])",
+          "\n            LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[COUNT($2)])",
+          "\n              PinotLogicalExchange(distribution=[hash[0]])",
+          "\n                LogicalAggregate(group=[{2}], agg#0=[$SUM0($2)], 
agg#1=[COUNT()])",
+          "\n                  LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1309,14 +1294,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(ORDER BY 
a.col2) FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], $1=[$2])",
-          "\n        LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$2])",
+          "\n      LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1325,14 +1309,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, RANK() OVER(ORDER BY a.col2) 
FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], $1=[$2])",
-          "\n        LogicalWindow(window#0=[window(order by [1] aggs 
[RANK()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$2])",
+          "\n      LogicalWindow(window#0=[window(order by [1] aggs 
[RANK()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1341,14 +1324,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(ORDER BY 
a.col2) FROM a ORDER BY a.col2",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
-          "\n        LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
+          "\n      LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1357,14 +1339,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, DENSE_RANK() OVER(ORDER BY 
a.col2) FROM a ORDER BY a.col2",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
-          "\n        LogicalWindow(window#0=[window(order by [1] aggs 
[DENSE_RANK()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
+          "\n      LogicalWindow(window#0=[window(order by [1] aggs 
[DENSE_RANK()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1373,14 +1354,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(ORDER BY 
a.col2) as row_number FROM a ORDER BY row_number DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
-          "\n      LogicalProject(col1=[$0], $1=[$2])",
-          "\n        LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$2])",
+          "\n      LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1389,14 +1369,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, RANK() OVER(ORDER BY a.col2) 
as rank FROM a ORDER BY rank DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
-          "\n      LogicalProject(col1=[$0], $1=[$2])",
-          "\n        LogicalWindow(window#0=[window(order by [1] aggs 
[RANK()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$2])",
+          "\n      LogicalWindow(window#0=[window(order by [1] aggs 
[RANK()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1405,14 +1384,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, DENSE_RANK() OVER(ORDER BY 
a.col2) as dense_rank FROM a ORDER BY dense_rank DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$1], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$1], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$1], dir0=[DESC])",
-          "\n      LogicalProject(col1=[$0], $1=[$2])",
-          "\n        LogicalWindow(window#0=[window(order by [1] aggs 
[DENSE_RANK()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], $1=[$2])",
+          "\n      LogicalWindow(window#0=[window(order by [1] aggs 
[DENSE_RANK()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1475,14 +1453,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(ORDER BY 
a.col1 DESC) FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
-          "\n        LogicalWindow(window#0=[window(order by [0 DESC] aggs 
[MIN($2)])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
+          "\n      LogicalWindow(window#0=[window(order by [0 DESC] aggs 
[MIN($2)])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1823,14 +1800,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(ORDER BY 
a.col2, a.col1 DESC), AVG(a.col3) OVER(ORDER BY a.col2, a.col1 DESC) FROM a 
ORDER BY a.col1 DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[DESC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
-          "\n        LogicalWindow(window#0=[window(order by [1, 0 DESC] aggs 
[SUM($2), COUNT($2)])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1, 0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)])",
+          "\n      LogicalWindow(window#0=[window(order by [1, 0 DESC] aggs 
[SUM($2), COUNT($2)])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1, 0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -1839,14 +1815,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, RANK() OVER(ORDER BY a.col2, 
a.col1 DESC), AVG(a.col3) OVER(ORDER BY a.col2, a.col1 DESC) FROM a ORDER BY 
a.col1 DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$0], dir0=[DESC], offset=[0])",
+          "\nLogicalSort(sort0=[$0], dir0=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$0], dir0=[DESC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($4):DOUBLE NOT NULL, $5)])",
-          "\n        LogicalWindow(window#0=[window(order by [1, 0 DESC] aggs 
[RANK(), SUM($2), COUNT($2)])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1, 0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($4):DOUBLE NOT NULL, $5)])",
+          "\n      LogicalWindow(window#0=[window(order by [1, 0 DESC] aggs 
[RANK(), SUM($2), COUNT($2)])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash], 
collation=[[1, 0 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -2144,14 +2119,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(PARTITION BY 
a.col1 ORDER BY a.col1) FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
-          "\n        LogicalWindow(window#0=[window(partition {0} order by [0] 
aggs [MIN($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
+          "\n      LogicalWindow(window#0=[window(partition {0} order by [0] 
aggs [MIN($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -2160,14 +2134,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, DENSE_RANK() OVER(PARTITION BY 
a.col1 ORDER BY a.col1) FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
-          "\n        LogicalWindow(window#0=[window(partition {0} order by [0] 
aggs [DENSE_RANK()])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
+          "\n      LogicalWindow(window#0=[window(partition {0} order by [0] 
aggs [DENSE_RANK()])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -2503,14 +2476,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY 
a.col2, a.col1 ORDER BY a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, 
a.col1 ORDER BY a.col2, a.col1) FROM a ORDER BY a.col2, a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], 
offset=[0])",
+          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 
0]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} order by 
[1, 0] aggs [SUM($2), COUNT($2)])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} order by 
[1, 0] aggs [SUM($2), COUNT($2)])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -2519,14 +2491,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, DENSE_RANK() OVER(PARTITION BY 
a.col2, a.col1 ORDER BY a.col2, a.col1), RANK() OVER(PARTITION BY a.col2, 
a.col1 ORDER BY a.col2, a.col1) FROM a ORDER BY a.col2, a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC], 
offset=[0])",
+          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 
0]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[ASC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$2], EXPR$2=[$3], 
col2=[$1])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} order by 
[1, 0] aggs [DENSE_RANK(), RANK()])])",
-          "\n          PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$2], EXPR$2=[$3], 
col2=[$1])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} order by 
[1, 0] aggs [DENSE_RANK(), RANK()])])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -2847,14 +2818,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(PARTITION BY 
a.col1 ORDER BY a.col2) FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
-          "\n        LogicalWindow(window#0=[window(partition {0} order by [1] 
aggs [MIN($2)])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col2=[$1], EXPR$1=[$3], col1=[$0])",
+          "\n      LogicalWindow(window#0=[window(partition {0} order by [1] 
aggs [MIN($2)])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -2863,14 +2833,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, ROW_NUMBER() OVER(PARTITION BY 
a.col1 ORDER BY a.col2) FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
-          "\n        LogicalWindow(window#0=[window(partition {0} order by [1] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
+          "\n      LogicalWindow(window#0=[window(partition {0} order by [1] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -2879,14 +2848,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, RANK() OVER(PARTITION BY 
a.col1 ORDER BY a.col2) FROM a ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
-          "\n      LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
-          "\n        LogicalWindow(window#0=[window(partition {0} order by [1] 
aggs [RANK()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
+          "\n      LogicalWindow(window#0=[window(partition {0} order by [1] 
aggs [RANK()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -3170,14 +3138,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY 
a.col2, a.col1 ORDER BY a.col3, a.col1), AVG(a.col3) OVER(PARTITION BY a.col2, 
a.col1 ORDER BY a.col3, a.col1) FROM a ORDER BY a.col2, a.col1 DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], 
offset=[0])",
+          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} order by 
[2, 0] aggs [SUM($2), COUNT($2)])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash[0, 1]], 
collation=[[2, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], 
EXPR$2=[/(CAST($3):DOUBLE NOT NULL, $4)], col2=[$1])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} order by 
[2, 0] aggs [SUM($2), COUNT($2)])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash[0, 1]], 
collation=[[2, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },
@@ -3186,14 +3153,13 @@
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, RANK() OVER(PARTITION BY 
a.col2, a.col1 ORDER BY a.col3, a.col1), DENSE_RANK() OVER(PARTITION BY a.col2, 
a.col1 ORDER BY a.col3, a.col1) FROM a ORDER BY a.col2, a.col1 DESC",
         "output": [
           "Execution Plan",
-          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC], 
offset=[0])",
+          "\nLogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC])",
           "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[3, 0 
DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n    LogicalSort(sort0=[$3], sort1=[$0], dir0=[ASC], dir1=[DESC])",
-          "\n      LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[$4], 
col2=[$1])",
-          "\n        LogicalWindow(window#0=[window(partition {0, 1} order by 
[2, 0] aggs [RANK(), DENSE_RANK()])])",
-          "\n          PinotLogicalSortExchange(distribution=[hash[0, 1]], 
collation=[[2, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
-          "\n            LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n              LogicalTableScan(table=[[a]])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$3], EXPR$2=[$4], 
col2=[$1])",
+          "\n      LogicalWindow(window#0=[window(partition {0, 1} order by 
[2, 0] aggs [RANK(), DENSE_RANK()])])",
+          "\n        PinotLogicalSortExchange(distribution=[hash[0, 1]], 
collation=[[2, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n          LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
       },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to