YangShaw commented on code in PR #14397: URL: https://github.com/apache/doris/pull/14397#discussion_r1099843860
########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalWindowToPhysicalWindow.java: ########## @@ -0,0 +1,539 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.annotation.DependsRules; +import org.apache.doris.nereids.properties.DistributionSpecHash; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.properties.OrderSpec; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.properties.RequireProperties; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.rewrite.logical.CheckAndStandardizeWindowFunctionAndFrame; +import org.apache.doris.nereids.rules.rewrite.logical.ExtractWindowExpression; +import org.apache.doris.nereids.rules.rewrite.logical.NormalizeWindow; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Window; +import org.apache.doris.nereids.trees.expressions.WindowFrame; +import org.apache.doris.nereids.trees.plans.GroupPlan; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; +import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; + +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Implementation rule that convert logical window to physical window, and add RequiredProperties + * + * step 1: compute three kinds of group: + * WindowFrameGroup: maintain windows with same PartitionKey, OrderKey and WindowFrame + * OrderKeyGroup: maintain windows with same PartitionKey and OrderKey + * PartitionKeyGroup: maintain windows with same PartitionKey + * step 2: sort PartitionKeyGroup with increasing order of tupleSize + * step 3: for every WindowFrameGroup of each SortGroup, generate one PhysicalWindow node, with common PartitionKeys, + * OrderKeys, unique WindowFrame and a function list. + * step 4: for each PhysicalWindow, generate RequiredProperties, including PartitionKey for DistributionSpec, + * and (PartitionKey + OrderKey) for OrderSpec. + */ +@DependsRules({ + NormalizeWindow.class, + CheckAndStandardizeWindowFunctionAndFrame.class, + ExtractWindowExpression.class +}) +public class LogicalWindowToPhysicalWindow extends OneImplementationRuleFactory { + + @Override + public Rule build() { + + return RuleType.LOGICAL_WINDOW_TO_PHYSICAL_WINDOW_RULE.build( + logicalWindow().when(LogicalWindow::isNormalized) + .when(LogicalWindow::isChecked) + .thenApply(ctx -> resolveWindow(ctx.cascadesContext, ctx.root)) + ); + } + + /** + * main procedure + */ + private PhysicalWindow resolveWindow(CascadesContext ctx, LogicalWindow<GroupPlan> logicalWindow) { + // todo: remove windowExpressions from LogicalWindow and rule ExtractWindowExpressions. + // Only add this variable in PhysicalWindow + List<NamedExpression> windowList = logicalWindow.getWindowExpressions(); + + /////////// create three kinds of groups and compute tupleSize of each + // windowFrameGroup + List<WindowFrameGroup> windowFrameGroupList = createCommonWindowFrameGroups(windowList); + + // orderKeyGroup + List<OrderKeyGroup> orderKeyGroupList = createCommonOrderKeyGroups(windowFrameGroupList); + mergeOrderKeyGroups(orderKeyGroupList); + + // partitionKeyGroup + List<PartitionKeyGroup> partitionKeyGroupList = createCommonPartitionKeyGroups(orderKeyGroupList); + mergePartitionKeyGroups(partitionKeyGroupList); + + // sort groups + sortPartitionKeyGroups(partitionKeyGroupList); + + // eliminate LogicalWindow, and replaced it with new LogicalWindow and LogicalSort + Plan newRoot = logicalWindow.child(); + for (PartitionKeyGroup partitionKeyGroup : partitionKeyGroupList) { + for (OrderKeyGroup orderKeyGroup : partitionKeyGroup.groupList) { + // create LogicalSort for each OrderKeyGroup; + // in OrderKeyGroup, create LogicalWindow for each WindowFrameGroup + newRoot = createLogicalPlanNodeForWindowFunctions(newRoot, orderKeyGroup, logicalWindow, ctx); + } + } + return (PhysicalWindow) newRoot; + } + + /* ******************************************************************************************** + * create PhysicalWindow and PhysicalSort + * ******************************************************************************************** */ + + private Plan createLogicalPlanNodeForWindowFunctions(Plan root, OrderKeyGroup orderKeyGroup, + LogicalWindow logicalWindow, CascadesContext ctx) { + // PhysicalSort node for orderKeys; if there exists no orderKey, newRoot = root + // Plan newRoot = createPhysicalSortNode(root, orderKeyGroup, ctx); + Plan newRoot = root; + + // we will not add PhysicalSort in this step, but generate it if necessary with the ability of enforcer by + // setting RequiredProperties + List<OrderKey> requiredOrderKeys = generateKeysNeedToBeSorted(orderKeyGroup); + + // PhysicalWindow nodes for each different window frame, so at least one PhysicalWindow node will be added + for (WindowFrameGroup windowFrameGroup : orderKeyGroup.groupList) { + newRoot = createPhysicalWindow(newRoot, windowFrameGroup, logicalWindow, requiredOrderKeys); + } + + return newRoot; + } + + private List<OrderKey> generateKeysNeedToBeSorted(OrderKeyGroup orderKeyGroup) { + // all keys that need to be sorted, which includes BOTH partitionKeys and orderKeys from this group + List<OrderKey> keysNeedToBeSortedList = Lists.newArrayList(); + + // used as SortNode.isAnalyticSort, but it is unnecessary to add it in LogicalSort + if (!orderKeyGroup.partitionKeyList.isEmpty()) { + keysNeedToBeSortedList.addAll(orderKeyGroup.partitionKeyList.stream().map(partitionKey -> { + // todo: haven't support isNullFirst, and its default value is false(see AnalyticPlanner#line403, + // but in LogicalPlanBuilder, its default value is true) + return new OrderKey(partitionKey, true, false); + }).collect(Collectors.toList())); + } + + if (!orderKeyGroup.orderKeyList.isEmpty()) { + keysNeedToBeSortedList.addAll(orderKeyGroup.orderKeyList); + } + return keysNeedToBeSortedList; + } + + private PhysicalWindow createPhysicalWindow(Plan root, WindowFrameGroup windowFrameGroup, + LogicalWindow logicalWindow, List<OrderKey> requiredOrderKeys) { + // todo: partitionByEq and orderByEq? + // requiredProperties: + // Distribution: partitionKeys + // Order: requiredOrderKeys + PhysicalWindow physicalWindow = new PhysicalWindow<>( + logicalWindow.getOutputExpressions(), + logicalWindow.getWindowExpressions(), + windowFrameGroup, + logicalWindow.getLogicalProperties(), + root); + // todo: add isAnalyticSort to physicalWindow + if (windowFrameGroup.partitionKeyList.isEmpty() && requiredOrderKeys.isEmpty()) { + return physicalWindow.withRequirePropertiesAndChild(RequireProperties.followParent(), root); + } + + PhysicalProperties properties; + if (windowFrameGroup.partitionKeyList.isEmpty()) { + properties = new PhysicalProperties(new OrderSpec(requiredOrderKeys)); + } else { + // todo: add new ShuffleType for window, like ShuffleType.WINDOW + properties = PhysicalProperties.createHash( + windowFrameGroup.partitionKeyList, DistributionSpecHash.ShuffleType.ENFORCED); + // requiredOrderKeys contain partitionKeys, so there is no need to check if requiredOrderKeys.isEmpty() + properties = properties.withOrderSpec(new OrderSpec(requiredOrderKeys)); + } + + RequireProperties requireProperties = RequireProperties.of(properties); + return physicalWindow.withRequirePropertiesAndChild(requireProperties, root); + } + + /* ******************************************************************************************** + * WindowFunctionRelatedGroups + * ******************************************************************************************** */ + + // todo: can we simplify the following three algorithms? + private List<WindowFrameGroup> createCommonWindowFrameGroups(List<NamedExpression> windowList) { + List<WindowFrameGroup> windowFrameGroupList = Lists.newArrayList(); + for (int i = 0; i < windowList.size(); i++) { + NamedExpression windowAlias = windowList.get(i); + + boolean matched = false; + for (WindowFrameGroup windowFrameGroup : windowFrameGroupList) { + if (windowFrameGroup.isCompatible(windowAlias)) { + windowFrameGroup.addGroupMember(windowAlias); + matched = true; + break; + } + } + if (!matched) { + windowFrameGroupList.add(new WindowFrameGroup(windowAlias)); + } + } + + for (WindowFrameGroup wfg : windowFrameGroupList) { + wfg.setTupleSize(wfg.groupList.stream().mapToInt(window -> window.child(0).getDataType().width()).sum()); + } + + return windowFrameGroupList; + } + + private List<OrderKeyGroup> createCommonOrderKeyGroups(List<WindowFrameGroup> windowFrameGroupList) { + List<OrderKeyGroup> orderKeyGroupList = Lists.newArrayList(); + + for (WindowFrameGroup windowFrameGroup : windowFrameGroupList) { + boolean matched = false; + for (OrderKeyGroup orderKeyGroup : orderKeyGroupList) { + if (orderKeyGroup.isCompatible(windowFrameGroup)) { + orderKeyGroup.addGroupMember(windowFrameGroup); + matched = true; + break; + } + } + if (!matched) { + orderKeyGroupList.add(new OrderKeyGroup(windowFrameGroup)); + } + } + + for (OrderKeyGroup okg : orderKeyGroupList) { + okg.setTupleSize(okg.getGroupList().stream().mapToInt(WindowFrameGroup::getTupleSize).sum()); + } + + return orderKeyGroupList; + } + + private List<PartitionKeyGroup> createCommonPartitionKeyGroups(List<OrderKeyGroup> orderKeyGroupList) { + List<PartitionKeyGroup> partitionKeyGroupList = Lists.newArrayList(); + + for (OrderKeyGroup orderKeyGroup : orderKeyGroupList) { + boolean matched = false; + for (PartitionKeyGroup partitionKeyGroup : partitionKeyGroupList) { + if (partitionKeyGroup.isCompatible(orderKeyGroup)) { + partitionKeyGroup.addGroupMember(orderKeyGroup); + matched = true; + break; + } + } + if (!matched) { + partitionKeyGroupList.add(new PartitionKeyGroup(orderKeyGroup)); + } + } + + for (PartitionKeyGroup pkg : partitionKeyGroupList) { + pkg.setTupleSize(pkg.getGroupList().stream().mapToInt(OrderKeyGroup::getTupleSize).sum()); + } + + return partitionKeyGroupList; + } + + private void mergeOrderKeyGroups(List<OrderKeyGroup> orderKeyGroupList) { + boolean merged = true; + + while (merged) { + merged = false; + for (OrderKeyGroup okg1 : orderKeyGroupList) { + for (OrderKeyGroup okg2 : orderKeyGroupList) { + if (okg1 != okg2 && okg2.isPrefixOf(okg1)) { + // okg2 ∈ okg1 + okg1.absorb(okg2); + orderKeyGroupList.remove(okg2); + merged = true; + break; + } + } + if (merged) { + break; + } + } Review Comment: yes, I have fixed this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org