This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 782b6979360 [multistage] Multiple Window Group Support (#16109)
782b6979360 is described below
commit 782b6979360b6ec28c869bbf8b7ea59e1548a3ef
Author: Xuanyi Li <[email protected]>
AuthorDate: Tue Jul 8 08:38:55 2025 -0700
[multistage] Multiple Window Group Support (#16109)
---
.../calcite/rel/rules/PinotQueryRuleSets.java | 2 +
.../calcite/rel/rules/PinotWindowSplitRule.java | 181 ++++++++++++
.../rel/rules/PinotWindowSplitRuleTest.java | 311 +++++++++++++++++++++
.../resources/queries/PhysicalOptimizerPlans.json | 73 +++++
.../resources/queries/WindowFunctionPlans.json | 80 ++++--
5 files changed, 627 insertions(+), 20 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
index 7607b0e0d42..191a2005884 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
@@ -222,6 +222,7 @@ public class PinotQueryRuleSets {
PinotAggregateExchangeNodeInsertRule.SortProjectAggregate.INSTANCE,
PinotAggregateExchangeNodeInsertRule.SortAggregate.INSTANCE,
PinotAggregateExchangeNodeInsertRule.WithoutSort.INSTANCE,
+ PinotWindowSplitRule.INSTANCE,
PinotWindowExchangeNodeInsertRule.INSTANCE,
PinotSetOpExchangeNodeInsertRule.INSTANCE,
@@ -241,6 +242,7 @@ public class PinotQueryRuleSets {
PinotLogicalAggregateRule.SortProjectAggregate.INSTANCE,
PinotLogicalAggregateRule.SortAggregate.INSTANCE,
PinotLogicalAggregateRule.PinotLogicalAggregateConverter.INSTANCE,
+ PinotWindowSplitRule.INSTANCE,
// Evaluate the Literal filter nodes
CoreRules.FILTER_REDUCE_EXPRESSIONS
);
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRule.java
new file mode 100644
index 00000000000..16d5dd61ab8
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRule.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.sql.SqlAggFunction;
+
+/**
+ * A RelOptRule to split a single LogicalWindow with multiple window groups
+ * into a chain of LogicalWindows, where each has exactly one window group.
+ *
+ * This version correctly handles window expressions that refer to constants
+ * by shifting RexInputRef pointers as the input field count changes down the
chain.
+ */
+public class PinotWindowSplitRule extends RelOptRule {
+
+ public static final PinotWindowSplitRule INSTANCE = new
PinotWindowSplitRule();
+
+ private PinotWindowSplitRule() {
+ super(operand(LogicalWindow.class, any()), "PinotWindowSplitterRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalWindow window = call.rel(0);
+ return window.groups.size() > 1;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final LogicalWindow originalWindow = call.rel(0);
+ final List<Window.Group> groups = originalWindow.groups;
+
+ RelNode currentInput = originalWindow.getInput();
+ final List<RelDataTypeField> originalOutputFields =
originalWindow.getRowType().getFieldList();
+ final int originalInputFieldCount =
currentInput.getRowType().getFieldCount();
+ final RelDataTypeFactory typeFactory =
originalWindow.getCluster().getTypeFactory();
+
+ int cumulativeAggFieldCount = 0;
+
+ for (int i = 0; i < groups.size(); i++) {
+ Window.Group group = groups.get(i);
+ final RelDataType currentInputType = currentInput.getRowType();
+ final int currentInputFieldCount = currentInputType.getFieldCount();
+
+ // Only shift if this is not the first window in the chain.
+ if (i > 0) {
+ int shift = currentInputFieldCount - originalInputFieldCount;
+ if (shift > 0) {
+ RexConstantRefShifter shifter = new
RexConstantRefShifter(originalInputFieldCount, shift);
+ group = shifter.apply(group);
+ }
+ }
+
+ // 1. Determine the RowType for the new single-group window.
+ List<RelDataTypeField> newWindowFields = new
ArrayList<>(currentInputType.getFieldList());
+ for (int j = 0; j < group.aggCalls.size(); j++) {
+ int fieldIndexInOriginal = originalInputFieldCount +
cumulativeAggFieldCount + j;
+ newWindowFields.add(originalOutputFields.get(fieldIndexInOriginal));
+ }
+ final RelDataType newWindowRowType =
typeFactory.createStructType(newWindowFields);
+ cumulativeAggFieldCount += group.aggCalls.size();
+
+ // 2. Create the new LogicalWindow with the (potentially shifted) group.
+ // The newly created window becomes the input for the next iteration.
+ currentInput = new LogicalWindow(
+ originalWindow.getCluster(),
+ originalWindow.getTraitSet(),
+ originalWindow.getHints(),
+ currentInput,
+ originalWindow.getConstants(),
+ newWindowRowType,
+ ImmutableList.of(group));
+ }
+ call.transformTo(currentInput);
+ }
+
+ /**
+ * A RexShuttle that shifts indices of RexInputRefs that point to constants.
+ *
+ * A RexInputRef can point to an input field or a constant. If its index is
>= originalInputFieldCount,
+ * it's a constant. When we chain windows, the input field count for
subsequent windows increases,
+ * so we must shift the indices for these constant references to avoid them
being misinterpreted
+ * as input field references.
+ */
+ static class RexConstantRefShifter extends RexShuttle {
+ private final int _originalInputFieldCount;
+ private final int _shift;
+
+ RexConstantRefShifter(int originalInputFieldCount, int shift) {
+ _originalInputFieldCount = originalInputFieldCount;
+ _shift = shift;
+ }
+
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ int index = inputRef.getIndex();
+ // If the index is greater than or equal to the original number of input
fields,
+ // it refers to a constant, so we must shift it.
+ if (index >= _originalInputFieldCount) {
+ return new RexInputRef(index + _shift, inputRef.getType());
+ }
+ // Otherwise, it's a reference to a field from the original input
relation,
+ // which does not need shifting.
+ return inputRef;
+ }
+
+ @Override
+ public RexNode visitCall(RexCall call) {
+ if (call instanceof Window.RexWinAggCall) {
+ Window.RexWinAggCall winCall = (Window.RexWinAggCall) call;
+ List<RexNode> newOperands = winCall.getOperands().stream()
+ .map(operand -> operand.accept(this))
+ .collect(Collectors.toList());
+ return new Window.RexWinAggCall(
+ (SqlAggFunction) winCall.getOperator(),
+ winCall.getType(),
+ newOperands,
+ winCall.ordinal,
+ winCall.distinct,
+ winCall.ignoreNulls
+ );
+ }
+ return super.visitCall(call);
+ }
+
+ /**
+ * Applies the shuttle to all expressions within a Window.Group.
+ */
+ public Window.Group apply(Window.Group group) {
+ List<Window.RexWinAggCall> newAggCalls = group.aggCalls.stream()
+ .map(agg -> (Window.RexWinAggCall) agg.accept(this))
+ .collect(Collectors.toList());
+
+ RexWindowBound newLowerBound = group.lowerBound.accept(this);
+ RexWindowBound newUpperBound = group.upperBound.accept(this);
+
+ return new Window.Group(
+ group.keys,
+ group.isRows,
+ newLowerBound,
+ newUpperBound,
+ group.exclude,
+ group.orderKeys,
+ newAggCalls
+ );
+ }
+ }
+}
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRuleTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRuleTest.java
new file mode 100644
index 00000000000..be187a59709
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/calcite/rel/rules/PinotWindowSplitRuleTest.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.hep.HepPlanner;
+import org.apache.calcite.plan.hep.HepProgramBuilder;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexWindowBounds;
+import org.apache.calcite.rex.RexWindowExclusion;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlAvgAggFunction;
+import org.apache.calcite.sql.fun.SqlSumAggFunction;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.pinot.query.type.TypeFactory;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PinotWindowSplitRuleTest {
+ private static final TypeFactory TYPE_FACTORY = new TypeFactory();
+ private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+ private AutoCloseable _mocks;
+
+ @Mock
+ private RelOptRuleCall _call;
+ @Mock
+ private HepRelVertex _input;
+ @Mock
+ private RelOptCluster _cluster;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ RelTraitSet traits = RelTraitSet.createEmpty();
+ Mockito.when(_input.getTraitSet()).thenReturn(traits);
+ Mockito.when(_input.getCluster()).thenReturn(_cluster);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testMatchesWithSingleGroup() {
+ // Test that the rule doesn't match when there's only one window group
+ LogicalWindow singleGroupWindow = createWindowWithGroups(1);
+ Mockito.when(_call.rel(0)).thenReturn(singleGroupWindow);
+
+ boolean matches = PinotWindowSplitRule.INSTANCE.matches(_call);
+ Assert.assertFalse(matches, "Rule should not match when there's only one
window group");
+ }
+
+ @Test
+ public void testMatchesWithMultipleGroups() {
+ // Test that the rule matches when there are multiple window groups
+ LogicalWindow multiGroupWindow = createWindowWithGroups(3);
+ Mockito.when(_call.rel(0)).thenReturn(multiGroupWindow);
+
+ boolean matches = PinotWindowSplitRule.INSTANCE.matches(_call);
+ Assert.assertTrue(matches, "Rule should match when there are multiple
window groups");
+ }
+
+ @Test
+ public void testOnMatchWithTwoGroups() {
+ // Test splitting a window with two groups into a chain of two windows
+ LogicalWindow originalWindow = createTestWindow(2);
+ Mockito.when(_call.rel(0)).thenReturn(originalWindow);
+
+ PinotWindowSplitRule.INSTANCE.onMatch(_call);
+
+ // Verify that transformTo was called with a LogicalWindow
+ ArgumentCaptor<RelNode> transformedNodeCapture =
ArgumentCaptor.forClass(RelNode.class);
+ Mockito.verify(_call,
Mockito.times(1)).transformTo(transformedNodeCapture.capture());
+
+ RelNode transformedNode = transformedNodeCapture.getValue();
+ Assert.assertTrue(transformedNode instanceof LogicalWindow,
+ "Transformed node should be a LogicalWindow");
+
+ LogicalWindow resultWindow = (LogicalWindow) transformedNode;
+ Assert.assertEquals(resultWindow.groups.size(), 1,
+ "Final window should have exactly one group");
+ }
+
+ @Test
+ public void testOnMatchWithThreeGroups() {
+ // Test splitting a window with three groups into a chain of three windows
+ LogicalWindow originalWindow = createTestWindow(3);
+ Mockito.when(_call.rel(0)).thenReturn(originalWindow);
+
+ PinotWindowSplitRule.INSTANCE.onMatch(_call);
+
+ // Verify that transformTo was called with a LogicalWindow
+ ArgumentCaptor<RelNode> transformedNodeCapture =
ArgumentCaptor.forClass(RelNode.class);
+ Mockito.verify(_call,
Mockito.times(1)).transformTo(transformedNodeCapture.capture());
+
+ RelNode transformedNode = transformedNodeCapture.getValue();
+ Assert.assertTrue(transformedNode instanceof LogicalWindow,
+ "Transformed node should be a LogicalWindow");
+
+ LogicalWindow resultWindow = (LogicalWindow) transformedNode;
+ Assert.assertEquals(resultWindow.groups.size(), 1,
+ "Final window should have exactly one group");
+ }
+
+ @Test
+ public void testRexConstantRefShifter() {
+ // Test the RexConstantRefShifter functionality
+ int originalInputFieldCount = 2;
+ int shift = 1;
+
+ // Create a RexInputRef that points to a constant (index >=
originalInputFieldCount)
+ RexInputRef constantRef = new RexInputRef(3,
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+
+ // Create a RexInputRef that points to an input field (index <
originalInputFieldCount)
+ RexInputRef inputFieldRef = new RexInputRef(1,
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+
+ // Create a window group with these references
+ Window.Group group = new Window.Group(
+ ImmutableBitSet.of(0),
+ true,
+ RexWindowBounds.UNBOUNDED_PRECEDING,
+ RexWindowBounds.UNBOUNDED_FOLLOWING,
+ RexWindowExclusion.EXCLUDE_NO_OTHER,
+ RelCollations.EMPTY,
+ Collections.emptyList()
+ );
+
+ // Apply the shifter
+ PinotWindowSplitRule.RexConstantRefShifter shifter =
+ new
PinotWindowSplitRule.RexConstantRefShifter(originalInputFieldCount, shift);
+ Window.Group shiftedGroup = shifter.apply(group);
+
+ // The group should be the same since it doesn't contain the RexInputRefs
we're testing
+ Assert.assertEquals(shiftedGroup, group);
+ }
+
+ @Test
+ public void testRexConstantRefShifterWithAggCalls() {
+ // Test the RexConstantRefShifter with window aggregate calls
+ int originalInputFieldCount = 2;
+ int shift = 1;
+
+ // Create RexInputRefs for testing
+ RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ RexInputRef inputFieldRef = new RexInputRef(0, intType); // Input field
+ RexInputRef constantRef = new RexInputRef(3, intType); // Constant
+
+ // Create window aggregate calls
+ Window.RexWinAggCall inputFieldAgg = new Window.RexWinAggCall(
+ new SqlSumAggFunction(intType), intType, List.of(inputFieldRef), 0,
false, false);
+ Window.RexWinAggCall constantAgg = new Window.RexWinAggCall(
+ new SqlAvgAggFunction(SqlKind.AVG), intType, List.of(constantRef), 1,
false, false);
+
+ // Create a window group with these aggregate calls
+ Window.Group group = new Window.Group(
+ ImmutableBitSet.of(0),
+ true,
+ RexWindowBounds.UNBOUNDED_PRECEDING,
+ RexWindowBounds.UNBOUNDED_FOLLOWING,
+ RexWindowExclusion.EXCLUDE_NO_OTHER,
+ RelCollations.EMPTY,
+ Arrays.asList(inputFieldAgg, constantAgg)
+ );
+
+ // Apply the shifter
+ PinotWindowSplitRule.RexConstantRefShifter shifter =
+ new
PinotWindowSplitRule.RexConstantRefShifter(originalInputFieldCount, shift);
+ Window.Group shiftedGroup = shifter.apply(group);
+
+ // Verify that the input field reference wasn't shifted
+ Window.RexWinAggCall shiftedInputFieldAgg = shiftedGroup.aggCalls.get(0);
+ RexInputRef shiftedInputFieldRef = (RexInputRef)
shiftedInputFieldAgg.getOperands().get(0);
+ Assert.assertEquals(shiftedInputFieldRef.getIndex(), 0,
+ "Input field reference should not be shifted");
+
+ // Verify that the constant reference was shifted
+ Window.RexWinAggCall shiftedConstantAgg = shiftedGroup.aggCalls.get(1);
+ RexInputRef shiftedConstantRef = (RexInputRef)
shiftedConstantAgg.getOperands().get(0);
+ Assert.assertEquals(shiftedConstantRef.getIndex(), 4,
+ "Constant reference should be shifted by " + shift);
+ }
+
+ private LogicalWindow createWindowWithGroups(int numGroups) {
+ RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ RexInputRef inputRef = new RexInputRef(0, intType);
+
+ List<Window.Group> groups = createWindowGroups(numGroups);
+
+ List<RexLiteral> constants = Collections.emptyList();
+ return LogicalWindow.create(RelTraitSet.createEmpty(), _input, constants,
intType, groups);
+ }
+
+ private List<Window.Group> createWindowGroups(int numGroups) {
+ RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ RexInputRef inputRef = new RexInputRef(0, intType);
+
+ List<Window.Group> groups = new ArrayList<>();
+ for (int i = 0; i < numGroups; i++) {
+ Window.RexWinAggCall aggCall = new Window.RexWinAggCall(
+ new SqlSumAggFunction(intType), intType, List.of(inputRef), i,
false, false);
+ Window.Group group = new Window.Group(
+ ImmutableBitSet.of(0),
+ true,
+ RexWindowBounds.UNBOUNDED_PRECEDING,
+ RexWindowBounds.UNBOUNDED_FOLLOWING,
+ RexWindowExclusion.EXCLUDE_NO_OTHER,
+ RelCollations.EMPTY,
+ List.of(aggCall)
+ );
+ groups.add(group);
+ }
+ return groups;
+ }
+
+ private LogicalWindow createTestWindow(int numGroups) {
+ final RelDataType intType =
TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ RexInputRef inputRef = new RexInputRef(0, intType);
+
+ // Create a real RelOptCluster
+ HepPlanner planner = new HepPlanner(new HepProgramBuilder().build());
+ RelOptCluster cluster = RelOptCluster.create(planner, new
RexBuilder(TYPE_FACTORY));
+
+ // Create a real input project with real row type
+ final List<RelDataTypeField> inputFields = createInputFields(1);
+ RelDataType inputRowType = TYPE_FACTORY.createStructType(inputFields);
+ LogicalProject inputProject = LogicalProject.create(
+ LogicalValues.create(cluster, inputRowType, ImmutableList.of()),
+ Collections.emptyList(),
+ List.of(inputRef),
+ inputRowType
+ );
+ Mockito.when(_input.getCurrentRel()).thenReturn(inputProject);
+ Mockito.when(_input.getCluster()).thenReturn(cluster);
+
+ // Create real window groups
+ List<Window.Group> groups = createWindowGroups(numGroups);
+ List<RexLiteral> constants = Collections.emptyList();
+
+ // Create real struct row type for the window
+ List<RelDataTypeField> windowFields = new ArrayList<>(inputFields);
+ for (int i = 0; i < groups.size(); i++) {
+ final int aggIdx = i;
+ windowFields.add(new RelDataTypeFieldImpl(
+ "agg_field" + aggIdx,
+ inputFields.size() + aggIdx,
+ intType
+ ));
+ }
+ RelDataType windowRowType = TYPE_FACTORY.createStructType(windowFields);
+
+ return LogicalWindow.create(
+ RelTraitSet.createEmpty(), inputProject, constants, windowRowType,
groups);
+ }
+
+ private List<RelDataTypeField> createInputFields(int count) {
+ List<RelDataTypeField> fields = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ fields.add(new RelDataTypeFieldImpl(
+ "field" + i,
+ i,
+ TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER)
+ ));
+ }
+ return fields;
+ }
+}
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index bd5137c837d..b669891ce5b 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -882,5 +882,78 @@
]
}
]
+ },
+ "physical_opt_window_functions": {
+ "queries": [
+ {
+ "description": "Multiple window groups",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT
MIN(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col3), MAX(a.col3)
OVER(PARTITION BY a.col1 ORDER BY a.col3), ROW_NUMBER() OVER(PARTITION BY
a.col1 ORDER BY a.col3), RANK() OVER(PARTITION BY a.col1 ORDER BY a.col3),
LAG(a.col3, 1, '0') OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a",
+ "notes": "the table is partitioned by col2, thus we don't need to
shuffle the data for the first windown function",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalProject($0=[$3], $1=[$4], $2=[$7], $3=[$5], $4=[$6])",
+ "\n PhysicalWindow(window#0=[window(partition {0} order by [2]
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])],
constants=[[1, _UTF-8'0']])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
collation=[[2]])",
+ "\n PhysicalWindow(window#0=[window(partition {0} order by
[2] aggs [MAX($2), RANK(), LAG($2, 1, _UTF-8'0')])], constants=[[1,
_UTF-8'0']])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
collation=[[2]])",
+ "\n PhysicalWindow(window#0=[window(partition {1} order
by [2] aggs [MIN($2)])], constants=[[1, _UTF-8'0']])",
+ "\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], collation=[[2]])",
+ "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Multiple window groups",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT
MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3), MIN(a.col3)
OVER(PARTITION BY a.col2 ORDER BY a.col3), SUM(a.col3) OVER(PARTITION BY a.col3
ORDER BY a.col1) FROM a",
+ "notes": "physical optimizer currently doesn't support reorder window
nodes according to the data distribution; the first window function will be
processed first, i.e. the lowest window node in the plan tree",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalProject($0=[$3], $1=[$4], $2=[$5])",
+ "\n PhysicalWindow(window#0=[window(partition {2} order by [0]
aggs [SUM($2)])])",
+ "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE],
distKeys=[[2]], collation=[[0]])",
+ "\n PhysicalWindow(window#0=[window(partition {1} order by
[2] aggs [MIN($2)])])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]],
collation=[[2]])",
+ "\n PhysicalWindow(window#0=[window(partition {0} order
by [2] aggs [MAX($2)])])",
+ "\n
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]],
collation=[[2]])",
+ "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Multiple window groups",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT
SUM(a.col3) OVER(ORDER BY a.col2), MIN(a.col3) OVER(PARTITION BY a.col2) FROM
a",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalProject($0=[$2], $1=[$3])",
+ "\n PhysicalWindow(window#0=[window(partition {0} aggs
[MIN($1)])])",
+ "\n PhysicalWindow(window#0=[window(order by [0] aggs
[SUM($1)])])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
collation=[[0]])",
+ "\n PhysicalProject(col2=[$1], col3=[$2])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Multiple window groups",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT
SUM(a.col3) OVER(ORDER BY a.col2, a.col1), MIN(a.col3) OVER(ORDER BY a.col1,
a.col2) FROM a",
+ "output": [
+ "Execution Plan",
+ "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+ "\n PhysicalProject($0=[$3], $1=[$4])",
+ "\n PhysicalWindow(window#0=[window(order by [0, 1] aggs
[MIN($2)])])",
+ "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE],
collation=[[0, 1]])",
+ "\n PhysicalWindow(window#0=[window(order by [1, 0] aggs
[SUM($2)])])",
+ "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE],
collation=[[1, 0]])",
+ "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+ "\n PhysicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
+ }
+ ]
}
}
diff --git
a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index f1f6da0973f..6f1700c36f0 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -3601,33 +3601,78 @@
},
{
"description": "Multiple window groups",
- "notes": "not yet supported",
- "ignored": true,
- "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(PARTITION BY a.col2
ORDER BY a.col3), MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a"
+ "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(PARTITION BY a.col2
ORDER BY a.col3), MAX(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM a",
+ "output": [
+ "Execution Plan",
+ "\nLogicalProject($0=[$3], $1=[$4])",
+ "\n LogicalWindow(window#0=[window(partition {0} order by [2] aggs
[MAX($2)])])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]],
collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n LogicalWindow(window#0=[window(partition {1} order by [2]
aggs [MIN($2)])])",
+ "\n PinotLogicalSortExchange(distribution=[hash[1]],
collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
},
{
"description": "Multiple window groups",
- "notes": "not yet supported",
- "ignored": true,
- "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY
a.col2), SUM(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col3) FROM a"
+ "sql": "EXPLAIN PLAN FOR SELECT COUNT(a.col3) OVER(PARTITION BY
a.col2), SUM(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col3) FROM a",
+ "output": [
+ "Execution Plan",
+ "\nLogicalProject($0=[$2], $1=[$3])",
+ "\n LogicalWindow(window#0=[window(partition {0} order by [1] aggs
[SUM($1)])])",
+ "\n PinotLogicalSortExchange(distribution=[hash[0]],
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n LogicalWindow(window#0=[window(partition {0} aggs
[COUNT($1)])])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
},
{
"description": "Multiple window groups",
- "notes": "not yet supported",
- "ignored": true,
- "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(), MAX(a.col3)
OVER(PARTITION BY a.col2) FROM a"
+ "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(), MAX(a.col3)
OVER(PARTITION BY a.col2) FROM a",
+ "output": [
+ "Execution Plan",
+ "\nLogicalProject(EXPR$0=[/(CAST($2):DOUBLE NOT NULL, $3)],
EXPR$1=[$4])",
+ "\n LogicalWindow(window#0=[window(partition {0} aggs [MAX($1)])])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalWindow(window#0=[window(aggs [SUM($1),
COUNT($1)])])",
+ "\n PinotLogicalExchange(distribution=[hash])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
},
{
"description": "Multiple window groups",
- "notes": "not yet supported",
- "ignored": true,
- "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2),
MIN(a.col3) OVER(PARTITION BY a.col2) FROM a"
+ "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2),
MIN(a.col3) OVER(PARTITION BY a.col2) FROM a",
+ "output": [
+ "Execution Plan",
+ "\nLogicalProject($0=[$2], $1=[$3])",
+ "\n LogicalWindow(window#0=[window(partition {0} aggs [MIN($1)])])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalWindow(window#0=[window(order by [0] aggs
[SUM($1)])])",
+ "\n PinotLogicalSortExchange(distribution=[hash],
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
},
{
"description": "Multiple window groups",
- "notes": "not yet supported",
- "ignored": true,
- "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2,
a.col1), MIN(a.col3) OVER(ORDER BY a.col1, a.col2) FROM a"
+ "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(ORDER BY a.col2,
a.col1), MIN(a.col3) OVER(ORDER BY a.col1, a.col2) FROM a",
+ "output": [
+ "Execution Plan",
+ "\nLogicalProject($0=[$3], $1=[$4])",
+ "\n LogicalWindow(window#0=[window(order by [0, 1] aggs
[MIN($2)])])",
+ "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0,
1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n LogicalWindow(window#0=[window(order by [1, 0] aggs
[SUM($2)])])",
+ "\n PinotLogicalSortExchange(distribution=[hash],
collation=[[1, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+ "\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n"
+ ]
}
]
},
@@ -3678,11 +3723,6 @@
"sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1
ORDER BY a.col2 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM a",
"expectedException": ".*ROW/RANGE not allowed with RANK, DENSE_RANK,
ROW_NUMBER, PERCENTILE_CONT/DISC or LAG/LEAD functions.*"
},
- {
- "description": "Apache Calcite failures with ROW_NUMBER() window
functions - default frame for ROW_NUMBER is different from aggregation window
functions, resulting in multiple window groups",
- "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1
ORDER BY a.col2), SUM(a.col1) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a",
- "expectedException": ".*Currently only 1 window group is supported,
query has 2 groups.*"
- },
{
"description": "Apache Calcite failures with ROW_NUMBER() window
functions - custom frames not allowed",
"sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]