github-actions[bot] commented on code in PR #63690:
URL: https://github.com/apache/doris/pull/63690#discussion_r3433249522


##########
regression-test/suites/query_p0/eager_agg/bilateral_eager_agg.groovy:
##########
@@ -0,0 +1,843 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("bilateral_eager_agg") {
+    sql """
+        drop table if exists t_pdajos_1;
+        CREATE TABLE `t_pdajos_1` (
+          `k` int NOT NULL COMMENT "join key",
+          `v` int NOT NULL COMMENT "agg column on left"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k`)
+        DISTRIBUTED BY HASH(`k`) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+        drop table if exists t_pdajos_2;
+        CREATE TABLE `t_pdajos_2` (
+          `k` int NOT NULL COMMENT "join key",
+          `v` int NOT NULL COMMENT "agg column on right"
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k`)
+        DISTRIBUTED BY HASH(`k`) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        );
+        insert into t_pdajos_1 values(1,10),(1,20),(2,30);
+        insert into t_pdajos_2 values(1,100),(1,200),(2,300);
+        
+        drop table if exists pdagg_proj_t1;
+        drop table if exists pdagg_proj_t2;
+        create table pdagg_proj_t1 (
+            id1 int not null,
+            x   int,
+            y   int,
+            z   int,
+            g1  int,
+            g2  int,
+            flag int
+        )
+        duplicate key(id1)
+        distributed by hash(id1) buckets 1
+        properties ("replication_num" = "1");
+        
+        create table pdagg_proj_t2 (
+            id2 int not null,
+            k   int,
+            v   int
+        )
+        duplicate key(id2)
+        distributed by hash(id2) buckets 1
+        properties ("replication_num" = "1");
+        
+        insert into pdagg_proj_t1 values
+            (1, 10, 1, 100, 7, 3, 1),
+            (2, 20, 2, 100, 7, 4, 0),
+            (3, 30, 3, 200, 8, 5, 1),
+            (4, 40, 4, 200, 8, 6, 0);
+        
+        insert into pdagg_proj_t2 values
+            (1, 10, 1000),
+            (2, 10, 2000),
+            (3, 20, 3000),
+            (5, 30, 5000);
+    """
+
+//    sql "set disable_nereids_rules='PUSH_DOWN_AGG_THROUGH_JOIN';"
+//    sql "SET eager_aggregation_mode = -1;"
+    order_qt_2_join """
+        SELECT
+          t1.k,
+          count(t1.v) AS lcount,
+          sum(t2.v) AS rsum,
+          sum(t3.v) as 3sum
+        FROM t_pdajos_1 t1
+        INNER JOIN t_pdajos_2 t2 ON t1.k = t2.k
+        inner join t_pdajos_2 t3 on t2.k=t3.k
+        GROUP BY t1.k;
+    """
+
+    order_qt_2_join_count_star"""
+        SELECT
+          t1.k,
+          count(t1.v) AS lcount,
+          sum(t2.v) AS rsum,
+          sum(t3.v) as 3sum,
+          count(*) as cntstar
+        FROM t_pdajos_1 t1
+        INNER JOIN t_pdajos_2 t2 ON t1.k = t2.k
+        inner join t_pdajos_2 t3 on t2.k=t3.k
+        GROUP BY t1.k;
+    """
+
+    order_qt_push_one_side"""
+        SELECT
+        t1.k,
+        count(t1.v) AS lcount,
+        sum(t1.v) as lsum,
+        min(t1.v) as lmin
+        FROM t_pdajos_1 t1
+        INNER JOIN t_pdajos_2 t2 ON t1.k = t2.k
+        GROUP BY t1.k;
+    """
+
+
+    order_qt_sum_to_2_side """
+        select t2.k, sum(if(t1.x==0,t1.y,t2.v)),sum(t1.y)
+        from pdagg_proj_t1 t1
+        inner join pdagg_proj_t2 t2 
+        group by t2.k;
+        select * from pdagg_proj_t2;

Review Comment:
   This `order_qt` block contains two `SELECT` statements, and the generated 
`!sum_to_2_side` output contains only the rows from `select * from 
pdagg_proj_t2`. As written, the preceding aggregate query is not asserted at 
all. Please split this into separate checks or remove the debug `select *` and 
regenerate the `.out` so the eager-agg result is actually validated.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/eageraggregation/EagerAggRewriter.java:
##########
@@ -70,77 +84,200 @@
  *         ->T2(D)
  */
 public class EagerAggRewriter extends DefaultPlanRewriter<PushDownAggContext> {
+    public static final int BIG_JOIN_BUILD_SIZE = 400_000;
     private static final double LOWER_AGGREGATE_EFFECT_COEFFICIENT = 10000;
     private static final double LOW_AGGREGATE_EFFECT_COEFFICIENT = 1000;
     private static final double MEDIUM_AGGREGATE_EFFECT_COEFFICIENT = 100;
+    private static final String JOIN_CNT = "joinCnt";
     private final StatsDerive derive = new StatsDerive(false);
 
     @Override
     public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> 
join, PushDownAggContext context) {
-        boolean toLeft = false;
-        boolean toRight = false;
-        boolean pushHere = false;
-        if (join.getJoinType().isAsofJoin()) {
-            // do nothing for asof join
-            return join;
+        Pair<Boolean, Boolean> pushSide = decideJoinPushSide(join, context);
+        boolean toLeft = pushSide.first;
+        boolean toRight = pushSide.second;
+        if (!toLeft && !toRight) {
+            if (SessionVariable.isEagerAggregationOnJoin()) {
+                return genAggregate(join, context);
+            } else {
+                return join;
+            }
         }
-        if (context.getAggFunctions().isEmpty()) {
-            // select t1.v from t1 join t2 on t1.id = t2.id group by t1.v, t2.v
-            // if no agg function, try to push agg to the child which contains 
all group keys
-            // TODO: consider t1.rows/(t1.id, t1.v).ndv and t2.rows/(t2.id, 
t2.v).ndv to determine push target
-            if 
(join.left().getOutputSet().containsAll(context.getGroupKeys())) {
-                toLeft = true;
-            } else if 
(join.right().getOutputSet().containsAll(context.getGroupKeys())) {
-                toRight = true;
+
+        // construct left and right group by keys
+        List<SlotReference> leftChildGroupByKeys = new ArrayList<>();
+        List<SlotReference> rightChildGroupByKeys = new ArrayList<>();
+        if (toLeft) {
+            fillGroupByKeys(join, join.left(), context, leftChildGroupByKeys);
+        }
+        if (toRight) {
+            fillGroupByKeys(join, join.right(), context, 
rightChildGroupByKeys);
+        }
+        // construct left and right aggFuncs and aliasMap
+        List<AggregateFunction> leftFuncs = new ArrayList<>();
+        List<AggregateFunction> rightFuncs = new ArrayList<>();
+        Map<AggregateFunction, Alias> leftAliasMap = new IdentityHashMap<>();
+        Map<AggregateFunction, Alias> rightAliasMap = new IdentityHashMap<>();
+        for (AggregateFunction f : context.getAggFunctions()) {
+            Set<Slot> inputs = f.getInputSlots();
+            Alias a = context.getAliasMap().get(f);
+            if (inputs.isEmpty()) {
+                if (join.getJoinType().isRightSemiOrAntiJoin()) {
+                    rightFuncs.add(f);
+                    rightAliasMap.put(f, a);
+                } else {
+                    leftFuncs.add(f);
+                    leftAliasMap.put(f, a);
+                }
+                continue;
+            }
+            if (join.left().getOutputSet().containsAll(inputs)) {
+                leftFuncs.add(f);
+                leftAliasMap.put(f, a);
+            } else if (join.right().getOutputSet().containsAll(inputs)) {
+                rightFuncs.add(f);
+                rightAliasMap.put(f, a);
             } else {
-                pushHere = true;
+                return join;
             }
+        }
+
+        boolean passThroughBigJoin = isPassThroughBigJoin(join, context);
+        boolean leftNeedOutputCount = needOutputCountForJoinChild(join, 
toLeft, toRight,
+                context.needOutputCount(), rightFuncs);
+        boolean rightNeedOutputCount = needOutputCountForJoinChild(join, 
toRight, toLeft,
+                context.needOutputCount(), leftFuncs);
+        Optional<PushDownAggContext> leftChildContext = toLeft ? 
Optional.of(context.forOneBranch(leftFuncs,
+                leftAliasMap, leftChildGroupByKeys, passThroughBigJoin, 
leftNeedOutputCount)) : Optional.empty();
+        Optional<PushDownAggContext> rightChildContext = toRight ? 
Optional.of(context.forOneBranch(rightFuncs,
+                rightAliasMap, rightChildGroupByKeys, passThroughBigJoin, 
rightNeedOutputCount)) : Optional.empty();
+
+        Plan newLeft = join.left();
+        Plan newRight = join.right();
+        if (leftChildContext.isPresent() && 
!leftChildContext.get().noGroupKeyAndNoAggFunc()) {
+            newLeft = join.left().accept(this, leftChildContext.get());
+        }
+        if (rightChildContext.isPresent() && 
!rightChildContext.get().noGroupKeyAndNoAggFunc()) {
+            newRight = join.right().accept(this, rightChildContext.get());
+        }
+
+        if (newLeft == join.left() && newRight == join.right()) {
+            context.getBilateralState().registerNoCountSlot(join);
+            return join;
+        }
+        Optional<Slot> leftChildCountSlot = 
context.getBilateralState().getCountSlot(newLeft);
+        Optional<Slot> rightChildCountSlot = 
context.getBilateralState().getCountSlot(newRight);
+        LogicalJoin<? extends Plan, ? extends Plan> newJoin = (LogicalJoin<? 
extends Plan, ? extends Plan>)
+                join.withChildren(newLeft, newRight);
+
+        if (leftChildCountSlot.isPresent() || rightChildCountSlot.isPresent()) 
{
+            return buildCanonicalJoinProject(newJoin, context, 
leftChildContext, rightChildContext,
+                    leftChildCountSlot, rightChildCountSlot);
+        }
+        context.getBilateralState().registerNoCountSlot(newJoin);
+        return newJoin;
+    }
+
+    private Pair<Boolean, Boolean> decideJoinPushSide(
+            LogicalJoin<? extends Plan, ? extends Plan> join, 
PushDownAggContext context) {
+        if (join.getJoinType().isAsofJoin() || join.isMarkJoin()) {
+            // do nothing for asof join and mark join
+            return Pair.of(false, false);

Review Comment:
   This `deduplicateOnly` branch is now the replacement path for the deleted 
`PushDownDistinctThroughJoin` rule, but this PR also removes both the old unit 
test and the `push_down_distinct_through_join` regression suite, and I do not 
see a new `SELECT DISTINCT` case in the eager-agg tests. That leaves this 
no-aggregate path untested, including the old `project(join)` shape. Please 
carry over equivalent distinct-through-join coverage under the new 
`PushDownAggregation` path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to