morrySnow commented on code in PR #10659: URL: https://github.com/apache/doris/pull/10659#discussion_r915560690
########## fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java: ########## @@ -114,60 +134,96 @@ public PlanFragment visit(Plan plan, PlanTranslatorContext context) { * Translate Agg. */ @Override - public PlanFragment visitPhysicalAggregation( - PhysicalUnaryPlan<PhysicalAggregation, Plan> agg, PlanTranslatorContext context) { - + public PlanFragment visitPhysicalAggregate( + PhysicalUnaryPlan<PhysicalAggregate, Plan> agg, PlanTranslatorContext context) { PlanFragment inputPlanFragment = visit(agg.child(0), context); - - AggregationNode aggregationNode; - List<Slot> slotList = new ArrayList<>(); - PhysicalAggregation physicalAggregation = agg.getOperator(); - AggregateInfo.AggPhase phase = physicalAggregation.getAggPhase().toExec(); - - List<Expression> groupByExpressionList = physicalAggregation.getGroupByExprList(); + PhysicalAggregate physicalAggregate = agg.getOperator(); + + // TODO: stale planner generate aggregate tuple in a special way. tuple include 2 parts: + // 1. group by expressions: removing duplicate expressions add to tuple + // 2. agg functions: only removing duplicate agg functions in output expression should appear in tuple. + // e.g. select sum(v1) + 1, sum(v1) + 2 from t1 should only generate one sum(v1) in tuple + // We need: + // 1. add a project after agg, if output expressions include agg function as a expression tree leaf. + // 2. introduce canonicalized, semanticEquals and deterministic in Expression + // for removing duplicate. + List<Expression> groupByExpressionList = physicalAggregate.getGroupByExprList(); + List<NamedExpression> outputExpressionList = physicalAggregate.getOutputExpressionList(); + + // 1. generate slot reference for each group expression + List<SlotReference> groupSlotList = Lists.newArrayList(); + for (Expression e : groupByExpressionList) { + if (e instanceof SlotReference && outputExpressionList.stream().anyMatch(o -> o.contains(e::equals))) { + groupSlotList.add((SlotReference) e); + } else { + groupSlotList.add(new SlotReference(e.sql(), e.getDataType(), e.nullable(), Collections.emptyList())); + } + } ArrayList<Expr> execGroupingExpressions = groupByExpressionList.stream() - // Since output of plan doesn't contain the slots of groupBy, which is actually needed by - // the BE execution, so we have to collect them and add to the slotList to generate corresponding - // TupleDesc. - .peek(x -> slotList.addAll(x.collect(SlotReference.class::isInstance))) .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toCollection(ArrayList::new)); - slotList.addAll(agg.getOutput()); - TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context, null); - - List<NamedExpression> outputExpressionList = physicalAggregation.getOutputExpressionList(); - ArrayList<FunctionCallExpr> execAggExpressions = outputExpressionList.stream() - .map(e -> e.<List<AggregateFunction>>collect(AggregateFunction.class::isInstance)) + // 2. collect agg functions and generate agg function to slot reference map + List<Slot> aggFunctionOutput = Lists.newArrayList(); + List<AggregateFunction> aggregateFunctionList = outputExpressionList.stream() + .filter(o -> o.contains(AggregateFunction.class::isInstance)) + .peek(o -> aggFunctionOutput.add(o.toSlot())) + .map(o -> (List<AggregateFunction>) o.collect(AggregateFunction.class::isInstance)) .flatMap(List::stream) + .collect(Collectors.toList()); + ArrayList<FunctionCallExpr> execAggExpressions = aggregateFunctionList.stream() .map(x -> (FunctionCallExpr) ExpressionTranslator.translate(x, context)) .collect(Collectors.toCollection(ArrayList::new)); - List<Expression> partitionExpressionList = physicalAggregation.getPartitionExprList(); + // 3. generate output tuple + // TODO: currently, we only support sum(a), if we want to support sum(a) + 1, we need to + // split merge agg to project(agg) and generate tuple like what first phase agg do. + List<Slot> slotList = Lists.newArrayList(); + TupleDescriptor outputTupleDesc; + if (agg.getOperator().getAggPhase() == AggPhase.FIRST_MERGE) { + slotList.addAll(groupSlotList); + slotList.addAll(aggFunctionOutput); + outputTupleDesc = generateTupleDesc(slotList, null, context); + } else { + outputTupleDesc = generateTupleDesc(agg.getOutput(), null, context); + } + + // process partition list + List<Expression> partitionExpressionList = physicalAggregate.getPartitionExprList(); List<Expr> execPartitionExpressions = partitionExpressionList.stream() - .map(e -> (FunctionCallExpr) ExpressionTranslator.translate(e, context)).collect(Collectors.toList()); + .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toList()); + DataPartition mergePartition = DataPartition.UNPARTITIONED; + if (CollectionUtils.isNotEmpty(execPartitionExpressions)) { Review Comment: yes, u r right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org