YangShaw commented on code in PR #14397:
URL: https://github.com/apache/doris/pull/14397#discussion_r1099843860


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java:
##########
@@ -0,0 +1,539 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.annotation.DependsRules;
+import org.apache.doris.nereids.properties.DistributionSpecHash;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.properties.OrderSpec;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.properties.RequireProperties;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import 
org.apache.doris.nereids.rules.rewrite.logical.CheckAndStandardizeWindowFunctionAndFrame;
+import org.apache.doris.nereids.rules.rewrite.logical.ExtractWindowExpression;
+import org.apache.doris.nereids.rules.rewrite.logical.NormalizeWindow;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Window;
+import org.apache.doris.nereids.trees.expressions.WindowFrame;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation rule that convert logical window to physical window, and add 
RequiredProperties
+ *
+ * step 1: compute three kinds of group:
+ *      WindowFrameGroup: maintain windows with same PartitionKey, OrderKey 
and WindowFrame
+ *      OrderKeyGroup: maintain windows with same PartitionKey and OrderKey
+ *      PartitionKeyGroup: maintain windows with same PartitionKey
+ * step 2: sort PartitionKeyGroup with increasing order of tupleSize
+ * step 3: for every WindowFrameGroup of each SortGroup, generate one 
PhysicalWindow node, with common PartitionKeys,
+ *  OrderKeys, unique WindowFrame and a function list.
+ * step 4: for each PhysicalWindow, generate RequiredProperties, including 
PartitionKey for DistributionSpec,
+ *  and (PartitionKey + OrderKey) for OrderSpec.
+ */
+@DependsRules({
+    NormalizeWindow.class,
+    CheckAndStandardizeWindowFunctionAndFrame.class,
+    ExtractWindowExpression.class
+})
+public class LogicalWindowToPhysicalWindow extends 
OneImplementationRuleFactory {
+
+    @Override
+    public Rule build() {
+
+        return RuleType.LOGICAL_WINDOW_TO_PHYSICAL_WINDOW_RULE.build(
+            logicalWindow().when(LogicalWindow::isNormalized)
+                .when(LogicalWindow::isChecked)
+                .thenApply(ctx -> resolveWindow(ctx.cascadesContext, ctx.root))
+        );
+    }
+
+    /**
+     *  main procedure
+     */
+    private PhysicalWindow resolveWindow(CascadesContext ctx, 
LogicalWindow<GroupPlan> logicalWindow) {
+        // todo: remove windowExpressions from LogicalWindow and rule 
ExtractWindowExpressions.
+        //  Only add this variable in PhysicalWindow
+        List<NamedExpression> windowList = 
logicalWindow.getWindowExpressions();
+
+        /////////// create three kinds of groups and compute tupleSize of each
+        // windowFrameGroup
+        List<WindowFrameGroup> windowFrameGroupList = 
createCommonWindowFrameGroups(windowList);
+
+        // orderKeyGroup
+        List<OrderKeyGroup> orderKeyGroupList = 
createCommonOrderKeyGroups(windowFrameGroupList);
+        mergeOrderKeyGroups(orderKeyGroupList);
+
+        // partitionKeyGroup
+        List<PartitionKeyGroup> partitionKeyGroupList = 
createCommonPartitionKeyGroups(orderKeyGroupList);
+        mergePartitionKeyGroups(partitionKeyGroupList);
+
+        // sort groups
+        sortPartitionKeyGroups(partitionKeyGroupList);
+
+        // eliminate LogicalWindow, and replaced it with new LogicalWindow and 
LogicalSort
+        Plan newRoot = logicalWindow.child();
+        for (PartitionKeyGroup partitionKeyGroup : partitionKeyGroupList) {
+            for (OrderKeyGroup orderKeyGroup : partitionKeyGroup.groupList) {
+                // create LogicalSort for each OrderKeyGroup;
+                // in OrderKeyGroup, create LogicalWindow for each 
WindowFrameGroup
+                newRoot = createLogicalPlanNodeForWindowFunctions(newRoot, 
orderKeyGroup, logicalWindow, ctx);
+            }
+        }
+        return (PhysicalWindow) newRoot;
+    }
+
+    /* 
********************************************************************************************
+     * create PhysicalWindow and PhysicalSort
+     * 
********************************************************************************************
 */
+
+    private Plan createLogicalPlanNodeForWindowFunctions(Plan root, 
OrderKeyGroup orderKeyGroup,
+                                                         LogicalWindow 
logicalWindow, CascadesContext ctx) {
+        // PhysicalSort node for orderKeys; if there exists no orderKey, 
newRoot = root
+        // Plan newRoot = createPhysicalSortNode(root, orderKeyGroup, ctx);
+        Plan newRoot = root;
+
+        // we will not add PhysicalSort in this step, but generate it if 
necessary with the ability of enforcer by
+        // setting RequiredProperties
+        List<OrderKey> requiredOrderKeys = 
generateKeysNeedToBeSorted(orderKeyGroup);
+
+        // PhysicalWindow nodes for each different window frame, so at least 
one PhysicalWindow node will be added
+        for (WindowFrameGroup windowFrameGroup : orderKeyGroup.groupList) {
+            newRoot = createPhysicalWindow(newRoot, windowFrameGroup, 
logicalWindow, requiredOrderKeys);
+        }
+
+        return newRoot;
+    }
+
+    private List<OrderKey> generateKeysNeedToBeSorted(OrderKeyGroup 
orderKeyGroup) {
+        // all keys that need to be sorted, which includes BOTH partitionKeys 
and orderKeys from this group
+        List<OrderKey> keysNeedToBeSortedList = Lists.newArrayList();
+
+        // used as SortNode.isAnalyticSort, but it is unnecessary to add it in 
LogicalSort
+        if (!orderKeyGroup.partitionKeyList.isEmpty()) {
+            
keysNeedToBeSortedList.addAll(orderKeyGroup.partitionKeyList.stream().map(partitionKey
 -> {
+                // todo: haven't support isNullFirst, and its default value is 
false(see AnalyticPlanner#line403,
+                //  but in LogicalPlanBuilder, its default value is true)
+                return new OrderKey(partitionKey, true, false);
+            }).collect(Collectors.toList()));
+        }
+
+        if (!orderKeyGroup.orderKeyList.isEmpty()) {
+            keysNeedToBeSortedList.addAll(orderKeyGroup.orderKeyList);
+        }
+        return keysNeedToBeSortedList;
+    }
+
+    private PhysicalWindow createPhysicalWindow(Plan root, WindowFrameGroup 
windowFrameGroup,
+                                                LogicalWindow logicalWindow, 
List<OrderKey> requiredOrderKeys) {
+        // todo: partitionByEq and orderByEq?
+        // requiredProperties:
+        // Distribution: partitionKeys
+        // Order: requiredOrderKeys
+        PhysicalWindow physicalWindow = new PhysicalWindow<>(
+                logicalWindow.getOutputExpressions(),
+                logicalWindow.getWindowExpressions(),
+                windowFrameGroup,
+                logicalWindow.getLogicalProperties(),
+                root);
+        // todo: add isAnalyticSort to physicalWindow
+        if (windowFrameGroup.partitionKeyList.isEmpty() && 
requiredOrderKeys.isEmpty()) {
+            return 
physicalWindow.withRequirePropertiesAndChild(RequireProperties.followParent(), 
root);
+        }
+
+        PhysicalProperties properties;
+        if (windowFrameGroup.partitionKeyList.isEmpty()) {
+            properties = new PhysicalProperties(new 
OrderSpec(requiredOrderKeys));
+        } else {
+            // todo: add new ShuffleType for window, like ShuffleType.WINDOW
+            properties = PhysicalProperties.createHash(
+                windowFrameGroup.partitionKeyList, 
DistributionSpecHash.ShuffleType.ENFORCED);
+            // requiredOrderKeys contain partitionKeys, so there is no need to 
check if requiredOrderKeys.isEmpty()
+            properties = properties.withOrderSpec(new 
OrderSpec(requiredOrderKeys));
+        }
+
+        RequireProperties requireProperties = RequireProperties.of(properties);
+        return physicalWindow.withRequirePropertiesAndChild(requireProperties, 
root);
+    }
+
+    /* 
********************************************************************************************
+     * WindowFunctionRelatedGroups
+     * 
********************************************************************************************
 */
+
+    // todo: can we simplify the following three algorithms?
+    private List<WindowFrameGroup> 
createCommonWindowFrameGroups(List<NamedExpression> windowList) {
+        List<WindowFrameGroup> windowFrameGroupList = Lists.newArrayList();
+        for (int i = 0; i < windowList.size(); i++) {
+            NamedExpression windowAlias = windowList.get(i);
+
+            boolean matched = false;
+            for (WindowFrameGroup windowFrameGroup : windowFrameGroupList) {
+                if (windowFrameGroup.isCompatible(windowAlias)) {
+                    windowFrameGroup.addGroupMember(windowAlias);
+                    matched = true;
+                    break;
+                }
+            }
+            if (!matched) {
+                windowFrameGroupList.add(new WindowFrameGroup(windowAlias));
+            }
+        }
+
+        for (WindowFrameGroup wfg : windowFrameGroupList) {
+            wfg.setTupleSize(wfg.groupList.stream().mapToInt(window -> 
window.child(0).getDataType().width()).sum());
+        }
+
+        return windowFrameGroupList;
+    }
+
+    private List<OrderKeyGroup> 
createCommonOrderKeyGroups(List<WindowFrameGroup> windowFrameGroupList) {
+        List<OrderKeyGroup> orderKeyGroupList = Lists.newArrayList();
+
+        for (WindowFrameGroup windowFrameGroup : windowFrameGroupList) {
+            boolean matched = false;
+            for (OrderKeyGroup orderKeyGroup : orderKeyGroupList) {
+                if (orderKeyGroup.isCompatible(windowFrameGroup)) {
+                    orderKeyGroup.addGroupMember(windowFrameGroup);
+                    matched = true;
+                    break;
+                }
+            }
+            if (!matched) {
+                orderKeyGroupList.add(new OrderKeyGroup(windowFrameGroup));
+            }
+        }
+
+        for (OrderKeyGroup okg : orderKeyGroupList) {
+            
okg.setTupleSize(okg.getGroupList().stream().mapToInt(WindowFrameGroup::getTupleSize).sum());
+        }
+
+        return orderKeyGroupList;
+    }
+
+    private List<PartitionKeyGroup> 
createCommonPartitionKeyGroups(List<OrderKeyGroup> orderKeyGroupList) {
+        List<PartitionKeyGroup> partitionKeyGroupList = Lists.newArrayList();
+
+        for (OrderKeyGroup orderKeyGroup : orderKeyGroupList) {
+            boolean matched = false;
+            for (PartitionKeyGroup partitionKeyGroup : partitionKeyGroupList) {
+                if (partitionKeyGroup.isCompatible(orderKeyGroup)) {
+                    partitionKeyGroup.addGroupMember(orderKeyGroup);
+                    matched = true;
+                    break;
+                }
+            }
+            if (!matched) {
+                partitionKeyGroupList.add(new 
PartitionKeyGroup(orderKeyGroup));
+            }
+        }
+
+        for (PartitionKeyGroup pkg : partitionKeyGroupList) {
+            
pkg.setTupleSize(pkg.getGroupList().stream().mapToInt(OrderKeyGroup::getTupleSize).sum());
+        }
+
+        return partitionKeyGroupList;
+    }
+
+    private void mergeOrderKeyGroups(List<OrderKeyGroup> orderKeyGroupList) {
+        boolean merged = true;
+
+        while (merged) {
+            merged = false;
+            for (OrderKeyGroup okg1 : orderKeyGroupList) {
+                for (OrderKeyGroup okg2 : orderKeyGroupList) {
+                    if (okg1 != okg2 && okg2.isPrefixOf(okg1)) {
+                        // okg2 ∈ okg1
+                        okg1.absorb(okg2);
+                        orderKeyGroupList.remove(okg2);
+                        merged = true;
+                        break;
+                    }
+                }
+                if (merged) {
+                    break;
+                }
+            }

Review Comment:
   yes, I have fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to