This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ee8d45fe3a1 [opt](nereids) support topn-filter for external table 
(#34674)
ee8d45fe3a1 is described below

commit ee8d45fe3a16dd1e812b13b026f7553cf8a1cae3
Author: minghong <engle...@gmail.com>
AuthorDate: Wed May 15 10:36:01 2024 +0800

    [opt](nereids) support topn-filter for external table (#34674)
    
    * support topn-filter for external table
---
 .../apache/doris/datasource/ExternalScanNode.java  |  5 ++
 .../org/apache/doris/datasource/FileScanNode.java  |  9 +++-
 .../doris/datasource/es/source/EsScanNode.java     |  8 ++++
 .../doris/datasource/jdbc/source/JdbcScanNode.java |  8 ++++
 .../doris/datasource/odbc/source/OdbcScanNode.java |  8 ++++
 .../glue/translator/PhysicalPlanTranslator.java    | 34 ++++++++++----
 .../doris/nereids/processor/post/TopNScanOpt.java  | 54 ++++++++++++----------
 .../nereids/processor/post/TopnFilterContext.java  | 46 ++++++++++++------
 .../org/apache/doris/planner/DataGenScanNode.java  | 12 +++--
 .../org/apache/doris/planner/MysqlScanNode.java    |  8 ++++
 .../org/apache/doris/planner/OlapScanNode.java     | 37 +--------------
 .../org/apache/doris/planner/OriginalPlanner.java  |  2 +-
 .../java/org/apache/doris/planner/ScanNode.java    | 29 ++++++++++++
 .../doris/planner/TestExternalTableScanNode.java   |  9 ++++
 .../main/java/org/apache/doris/qe/Coordinator.java |  6 +--
 .../external_table_p0/jdbc/test_jdbc_query_pg.out  | 20 ++++++++
 .../jdbc/test_jdbc_query_pg.groovy                 |  4 +-
 17 files changed, 203 insertions(+), 96 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index d41fab5916c..e85fed8b62a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -24,6 +24,7 @@ import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import org.apache.logging.log4j.LogManager;
@@ -96,4 +97,8 @@ public abstract class ExternalScanNode extends ScanNode {
     public int getNumInstances() {
         return scanRangeLocations.size();
     }
+
+    protected void toThrift(TPlanNode msg) {
+        super.toThrift(msg);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 227f636e67b..fba97bc5959 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -55,6 +55,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Base class for External File Scan, including external query and load.
@@ -93,6 +94,7 @@ public abstract class FileScanNode extends ExternalScanNode {
             fileScanNode.setTableName(desc.getTable().getName());
         }
         planNode.setFileScanNode(fileScanNode);
+        super.toThrift(planNode);
     }
 
     @Override
@@ -167,7 +169,12 @@ public abstract class FileScanNode extends 
ExternalScanNode {
         }
         output.append(String.format("numNodes=%s", numNodes)).append("\n");
         output.append(prefix).append(String.format("pushdown agg=%s", 
pushDownAggNoGroupingOp)).append("\n");
-
+        if (useTopnFilter()) {
+            String topnFilterSources = String.join(",",
+                    topnFilterSortNodes.stream()
+                            .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
+            output.append(prefix).append("TOPN 
OPT:").append(topnFilterSources).append("\n");
+        }
         return output.toString();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
index 7bc4f9ea1c0..663657b5d95 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
@@ -67,6 +67,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * ScanNode for Elasticsearch.
@@ -183,6 +184,7 @@ public class EsScanNode extends ExternalScanNode {
         }
         esScanNode.setProperties(properties);
         msg.es_scan_node = esScanNode;
+        super.toThrift(msg);
     }
 
     // only do partition(es index level) prune
@@ -316,6 +318,12 @@ public class EsScanNode extends ExternalScanNode {
         String indexName = table.getIndexName();
         String typeName = table.getMappingType();
         output.append(prefix).append(String.format("ES index/type: %s/%s", 
indexName, typeName)).append("\n");
+        if (useTopnFilter()) {
+            String topnFilterSources = String.join(",",
+                    topnFilterSortNodes.stream()
+                            .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
+            output.append(prefix).append("TOPN 
OPT:").append(topnFilterSources).append("\n");
+        }
         return output.toString();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index 58ab0f9d226..cd06a9a0d20 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -60,6 +60,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class JdbcScanNode extends ExternalScanNode {
     private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class);
@@ -257,6 +258,12 @@ public class JdbcScanNode extends ExternalScanNode {
                 output.append(prefix).append("PREDICATES: 
").append(expr.toSql()).append("\n");
             }
         }
+        if (useTopnFilter()) {
+            String topnFilterSources = String.join(",",
+                    topnFilterSortNodes.stream()
+                            .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
+            output.append(prefix).append("TOPN 
OPT:").append(topnFilterSources).append("\n");
+        }
         return output.toString();
     }
 
@@ -308,6 +315,7 @@ public class JdbcScanNode extends ExternalScanNode {
             msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
         }
         msg.jdbc_scan_node.setTableType(jdbcType);
+        super.toThrift(msg);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
index 9b597dddb54..9b964e41fa9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
@@ -54,6 +54,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Full scan of an ODBC table.
@@ -141,6 +142,12 @@ public class OdbcScanNode extends ExternalScanNode {
             Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
             output.append(prefix).append("PREDICATES: 
").append(expr.toSql()).append("\n");
         }
+        if (useTopnFilter()) {
+            String topnFilterSources = String.join(",",
+                    topnFilterSortNodes.stream()
+                            .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
+            output.append(prefix).append("TOPN 
OPT:").append(topnFilterSources).append("\n");
+        }
         return output.toString();
     }
 
@@ -233,6 +240,7 @@ public class OdbcScanNode extends ExternalScanNode {
         odbcScanNode.setQueryString(getOdbcQueryStr());
 
         msg.odbc_scan_node = odbcScanNode;
+        super.toThrift(msg);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b8cae519296..fba5558c462 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -593,6 +593,10 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, 
context)
                 )
         );
+        if (context.getTopnFilterContext().isTopnFilterTarget(fileScan)) {
+            context.getTopnFilterContext().addLegacyTarget(fileScan, scanNode);
+        }
+
         Utils.execWithUncheckedException(scanNode::finalizeForNereids);
         // Create PlanFragment
         DataPartition dataPartition = DataPartition.RANDOM;
@@ -637,6 +641,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context)
                 )
         );
+        if (context.getTopnFilterContext().isTopnFilterTarget(esScan)) {
+            context.getTopnFilterContext().addLegacyTarget(esScan, esScanNode);
+        }
         Utils.execWithUncheckedException(esScanNode::finalizeForNereids);
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
esScanNode, dataPartition);
@@ -661,6 +668,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
                 )
         );
+        if (context.getTopnFilterContext().isTopnFilterTarget(jdbcScan)) {
+            context.getTopnFilterContext().addLegacyTarget(jdbcScan, 
jdbcScanNode);
+        }
         Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids);
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
jdbcScanNode, dataPartition);
@@ -685,6 +695,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
                 )
         );
+        if (context.getTopnFilterContext().isTopnFilterTarget(odbcScan)) {
+            context.getTopnFilterContext().addLegacyTarget(odbcScan, 
odbcScanNode);
+        }
         Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
odbcScanNode, dataPartition);
@@ -763,7 +776,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         );
         
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
         if (context.getTopnFilterContext().isTopnFilterTarget(olapScan)) {
-            olapScanNode.setUseTopnOpt(true);
             context.getTopnFilterContext().addLegacyTarget(olapScan, 
olapScanNode);
         }
         // TODO: we need to remove all finalizeForNereids
@@ -790,7 +802,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PlanFragment planFragment = 
visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
         OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
         if 
(context.getTopnFilterContext().isTopnFilterTarget(deferMaterializeOlapScan)) {
-            olapScanNode.setUseTopnOpt(true);
             
context.getTopnFilterContext().addLegacyTarget(deferMaterializeOlapScan, 
olapScanNode);
         }
         TupleDescriptor tupleDescriptor = 
context.getTupleDesc(olapScanNode.getTupleId());
@@ -2113,11 +2124,14 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
                 sortNode.setUseTopnOpt(true);
                 context.getTopnFilterContext().getTargets(topN).forEach(
-                        olapScan -> {
-                            Optional<OlapScanNode> legacyScan =
-                                    
context.getTopnFilterContext().getLegacyScanNode(olapScan);
+                        relation -> {
+                            Optional<ScanNode> legacyScan =
+                                    
context.getTopnFilterContext().getLegacyScanNode(relation);
                             Preconditions.checkState(legacyScan.isPresent(),
-                                    "cannot find OlapScanNode for topn 
filter");
+                                    "cannot find ScanNode for topn filter:\n"
+                                            + "relation: %s \n%s",
+                                    relation.toString(),
+                                    context.getTopnFilterContext().toString());
                             legacyScan.get().addTopnFilterSortNode(sortNode);
                         }
                 );
@@ -2173,11 +2187,11 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
                 sortNode.setUseTopnOpt(true);
                 context.getTopnFilterContext().getTargets(topN).forEach(
-                        olapScan -> {
-                            Optional<OlapScanNode> legacyScan =
-                                    
context.getTopnFilterContext().getLegacyScanNode(olapScan);
+                        relation -> {
+                            Optional<ScanNode> legacyScan =
+                                    
context.getTopnFilterContext().getLegacyScanNode(relation);
                             Preconditions.checkState(legacyScan.isPresent(),
-                                    "cannot find OlapScanNode for topn 
filter");
+                                    "cannot find ScanNode for topn filter");
                             legacyScan.get().addTopnFilterSortNode(sortNode);
                         }
                 );
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index a8129639f5e..ec1e52d6426 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -23,9 +23,14 @@ import 
org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.SortPhase;
 import org.apache.doris.nereids.trees.plans.algebra.Join;
-import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
 import org.apache.doris.qe.ConnectContext;
@@ -41,10 +46,9 @@ import java.util.Optional;
  */
 
 public class TopNScanOpt extends PlanPostProcessor {
-
     @Override
     public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? 
extends Plan> topN, CascadesContext ctx) {
-        Optional<OlapScan> scanOpt = findScanForTopnFilter(topN);
+        Optional<PhysicalRelation> scanOpt = findScanForTopnFilter(topN);
         scanOpt.ifPresent(scan -> 
ctx.getTopnFilterContext().addTopnFilter(topN, scan));
         topN.child().accept(this, ctx);
         return topN;
@@ -53,13 +57,13 @@ public class TopNScanOpt extends PlanPostProcessor {
     @Override
     public Plan 
visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> 
topN,
             CascadesContext context) {
-        Optional<OlapScan> scanOpt = 
findScanForTopnFilter(topN.getPhysicalTopN());
+        Optional<PhysicalRelation> scanOpt = 
findScanForTopnFilter(topN.getPhysicalTopN());
         scanOpt.ifPresent(scan -> 
context.getTopnFilterContext().addTopnFilter(topN, scan));
         topN.child().accept(this, context);
         return topN;
     }
 
-    private Optional<OlapScan> findScanForTopnFilter(PhysicalTopN<? extends 
Plan> topN) {
+    private Optional<PhysicalRelation> findScanForTopnFilter(PhysicalTopN<? 
extends Plan> topN) {
         if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
             return Optional.empty();
         }
@@ -92,30 +96,23 @@ public class TopNScanOpt extends PlanPostProcessor {
         }
 
         boolean nullsFirst = topN.getOrderKeys().get(0).isNullFirst();
-        OlapScan olapScan = findScanNodeBySlotReference(topN, (SlotReference) 
firstKey, nullsFirst);
-        if (olapScan != null
-                && olapScan.getTable().isDupKeysOrMergeOnWrite()
-                && olapScan instanceof PhysicalCatalogRelation) {
-            return Optional.of(olapScan);
-        }
-
-        return Optional.empty();
+        return findScanNodeBySlotReference(topN, (SlotReference) firstKey, 
nullsFirst);
     }
 
-    private OlapScan findScanNodeBySlotReference(Plan root, SlotReference 
slot, boolean nullsFirst) {
+    private Optional<PhysicalRelation> findScanNodeBySlotReference(Plan root, 
SlotReference slot, boolean nullsFirst) {
         if (root instanceof PhysicalWindow) {
-            return null;
+            return Optional.empty();
         }
 
-        if (root instanceof OlapScan) {
-            if (root.getOutputSet().contains(slot)) {
-                return (OlapScan) root;
+        if (root instanceof PhysicalRelation) {
+            if (root.getOutputSet().contains(slot) && 
supportPhysicalRelations((PhysicalRelation) root)) {
+                return Optional.of((PhysicalRelation) root);
             } else {
-                return null;
+                return Optional.empty();
             }
         }
 
-        OlapScan target = null;
+        Optional<PhysicalRelation> target;
         if (root instanceof Join) {
             Join join = (Join) root;
             if (nullsFirst && join.getJoinType().isOuterJoin()) {
@@ -123,7 +120,7 @@ public class TopNScanOpt extends PlanPostProcessor {
                 // and to the right child of rightOuterJoin.
                 // but we have rule to push topn down to the left/right side. 
and topn-filter
                 // will be generated according to the inferred topn node.
-                return null;
+                return Optional.empty();
             }
             // try to push to both left and right child
             if (root.child(0).getOutputSet().contains(slot)) {
@@ -139,12 +136,12 @@ public class TopNScanOpt extends PlanPostProcessor {
             Plan child = root.child(0);
             if (child.getOutputSet().contains(slot)) {
                 target = findScanNodeBySlotReference(child, slot, nullsFirst);
-                if (target != null) {
+                if (target.isPresent()) {
                     return target;
                 }
             }
         }
-        return target;
+        return Optional.empty();
     }
 
     private long getTopNOptLimitThreshold() {
@@ -153,4 +150,13 @@ public class TopNScanOpt extends PlanPostProcessor {
         }
         return -1;
     }
+
+    private boolean supportPhysicalRelations(PhysicalRelation relation) {
+        return relation instanceof PhysicalOlapScan
+                || relation instanceof PhysicalOdbcScan
+                || relation instanceof PhysicalEsScan
+                || relation instanceof PhysicalFileScan
+                || relation instanceof PhysicalJdbcScan
+                || relation instanceof PhysicalDeferMaterializeOlapScan;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java
index b5f79defef4..6a4fe3123df 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java
@@ -17,9 +17,9 @@
 
 package org.apache.doris.nereids.processor.post;
 
-import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
 import org.apache.doris.nereids.trees.plans.algebra.TopN;
-import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
+import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SortNode;
 
 import com.google.common.collect.Lists;
@@ -35,20 +35,20 @@ import java.util.Set;
  * topN runtime filter context
  */
 public class TopnFilterContext {
-    private final Map<TopN, List<OlapScan>> filters = Maps.newHashMap();
+    private final Map<TopN, List<PhysicalRelation>> filters = 
Maps.newHashMap();
     private final Set<TopN> sources = Sets.newHashSet();
-    private final Set<OlapScan> targets = Sets.newHashSet();
-    private final Map<OlapScan, OlapScanNode> legacyTargetsMap = 
Maps.newHashMap();
+    private final Set<PhysicalRelation> targets = Sets.newHashSet();
+    private final Map<PhysicalRelation, ScanNode> legacyTargetsMap = 
Maps.newHashMap();
     private final Map<TopN, SortNode> legacySourceMap = Maps.newHashMap();
 
     /**
      * add topN filter
      */
-    public void addTopnFilter(TopN topn, OlapScan scan) {
+    public void addTopnFilter(TopN topn, PhysicalRelation scan) {
         targets.add(scan);
         sources.add(topn);
 
-        List<OlapScan> targets = filters.get(topn);
+        List<PhysicalRelation> targets = filters.get(topn);
         if (targets == null) {
             filters.put(topn, Lists.newArrayList(scan));
         } else {
@@ -59,14 +59,14 @@ public class TopnFilterContext {
     /**
      * find the corresponding sortNode for topn filter
      */
-    public Optional<OlapScanNode> getLegacyScanNode(OlapScan scan) {
-        return legacyTargetsMap.keySet().contains(scan)
+    public Optional<ScanNode> getLegacyScanNode(PhysicalRelation scan) {
+        return legacyTargetsMap.containsKey(scan)
                 ? Optional.of(legacyTargetsMap.get(scan))
                 : Optional.empty();
     }
 
     public Optional<SortNode> getLegacySortNode(TopN topn) {
-        return legacyTargetsMap.keySet().contains(topn)
+        return legacyTargetsMap.containsKey(topn)
                 ? Optional.of(legacySourceMap.get(topn))
                 : Optional.empty();
     }
@@ -75,19 +75,35 @@ public class TopnFilterContext {
         return sources.contains(topn);
     }
 
-    public boolean isTopnFilterTarget(OlapScan scan) {
-        return targets.contains(scan);
+    public boolean isTopnFilterTarget(PhysicalRelation relation) {
+        return targets.contains(relation);
     }
 
     public void addLegacySource(TopN topn, SortNode sort) {
         legacySourceMap.put(topn, sort);
     }
 
-    public void addLegacyTarget(OlapScan olapScan, OlapScanNode legacy) {
-        legacyTargetsMap.put(olapScan, legacy);
+    public void addLegacyTarget(PhysicalRelation relation, ScanNode legacy) {
+        legacyTargetsMap.put(relation, legacy);
     }
 
-    public List<OlapScan> getTargets(TopN topn) {
+    public List<PhysicalRelation> getTargets(TopN topn) {
         return filters.get(topn);
     }
+
+    /**
+     * toString
+     */
+    public String toString() {
+        StringBuilder builder = new StringBuilder("TopnFilterContext\n");
+        String indent = "   ";
+        String arrow = " -> ";
+        builder.append("filters:\n");
+        for (TopN topn : filters.keySet()) {
+            builder.append(indent).append(topn.toString()).append("\n");
+            
builder.append(indent).append(arrow).append(filters.get(topn)).append("\n");
+        }
+        return builder.toString();
+
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index 14a50160d63..1c760adb94a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -38,6 +38,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * This scan node is used for data source generated from memory.
@@ -79,6 +80,7 @@ public class DataGenScanNode extends ExternalScanNode {
         dataGenScanNode.setTupleId(desc.getId().asInt());
         dataGenScanNode.setFuncName(tvf.getDataGenFunctionName());
         msg.data_gen_scan_node = dataGenScanNode;
+        super.toThrift(msg);
     }
 
     @Override
@@ -130,11 +132,13 @@ public class DataGenScanNode extends ExternalScanNode {
         if (!conjuncts.isEmpty()) {
             output.append(prefix).append("predicates: 
").append(getExplainString(conjuncts)).append("\n");
         }
-
         output.append(prefix).append("table value function: 
").append(tvf.getDataGenFunctionName()).append("\n");
-
-
-
+        if (useTopnFilter()) {
+            String topnFilterSources = String.join(",",
+                    topnFilterSortNodes.stream()
+                            .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
+            output.append(prefix).append("TOPN 
OPT:").append(topnFilterSources).append("\n");
+        }
         return output.toString();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
index 703566bbc0b..fe75530dae9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
@@ -43,6 +43,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Full scan of an MySQL table.
@@ -93,6 +94,12 @@ public class MysqlScanNode extends ExternalScanNode {
             Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
             output.append(prefix).append("PREDICATES: 
").append(expr.toSql()).append("\n");
         }
+        if (useTopnFilter()) {
+            String topnFilterSources = String.join(",",
+                    topnFilterSortNodes.stream()
+                            .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
+            output.append(prefix).append("TOPN 
OPT:").append(topnFilterSources).append("\n");
+        }
         return output.toString();
     }
 
@@ -153,6 +160,7 @@ public class MysqlScanNode extends ExternalScanNode {
     protected void toThrift(TPlanNode msg) {
         msg.node_type = TPlanNodeType.MYSQL_SCAN_NODE;
         msg.mysql_scan_node = new TMySQLScanNode(desc.getId().asInt(), 
tblName, columns, filters);
+        super.toThrift(msg);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 6b5128ba5eb..3e32853119b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -178,12 +178,6 @@ public class OlapScanNode extends ScanNode {
     // It's limit for scanner instead of scanNode so we add a new limit.
     private long sortLimit = -1;
 
-    // useTopnOpt is equivalent to !topnFilterSortNodes.isEmpty().
-    // keep this flag for compatibility.
-    private boolean useTopnOpt = false;
-    // support multi topn filter
-    private final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
-
     // List of tablets will be scanned by current olap_scan_node
     private ArrayList<Long> scanTabletIds = Lists.newArrayList();
 
@@ -319,14 +313,6 @@ public class OlapScanNode extends ScanNode {
         this.sortLimit = sortLimit;
     }
 
-    public boolean getUseTopnOpt() {
-        return useTopnOpt;
-    }
-
-    public void setUseTopnOpt(boolean useTopnOpt) {
-        this.useTopnOpt = useTopnOpt;
-    }
-
     public Collection<Long> getSelectedPartitionIds() {
         return selectedPartitionIds;
     }
@@ -1347,7 +1333,7 @@ public class OlapScanNode extends ScanNode {
         if (sortLimit != -1) {
             output.append(prefix).append("SORT LIMIT: 
").append(sortLimit).append("\n");
         }
-        if (useTopnOpt) {
+        if (useTopnFilter()) {
             String topnFilterSources = String.join(",",
                     topnFilterSortNodes.stream()
                             .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
@@ -1524,18 +1510,6 @@ public class OlapScanNode extends ScanNode {
         if (sortLimit != -1) {
             msg.olap_scan_node.setSortLimit(sortLimit);
         }
-        msg.olap_scan_node.setUseTopnOpt(useTopnOpt);
-        List<Integer> topnFilterSourceNodeIds = getTopnFilterSortNodes()
-                .stream()
-                .map(sortNode -> sortNode.getId().asInt())
-                .collect(Collectors.toList());
-        if (!topnFilterSourceNodeIds.isEmpty()) {
-            if (SessionVariable.enablePipelineEngineX()) {
-                msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
-            } else {
-                
msg.olap_scan_node.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
-            }
-        }
         msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift());
         String tableName = olapTable.getName();
         if (selectedIndexId != -1) {
@@ -1556,6 +1530,7 @@ public class OlapScanNode extends ScanNode {
         if (shouldColoScan || SessionVariable.enablePipelineEngineX()) {
             msg.olap_scan_node.setDistributeColumnIds(new 
ArrayList<>(distributionColumnIds));
         }
+        super.toThrift(msg);
     }
 
     public void collectColumns(Analyzer analyzer, Set<String> 
equivalenceColumns, Set<String> unequivalenceColumns) {
@@ -1813,14 +1788,6 @@ public class OlapScanNode extends ScanNode {
         return getScanTabletIds().size();
     }
 
-    public void addTopnFilterSortNode(SortNode sortNode) {
-        topnFilterSortNodes.add(sortNode);
-    }
-
-    public List<SortNode> getTopnFilterSortNodes() {
-        return topnFilterSortNodes;
-    }
-
     @Override
     public int numScanBackends() {
         return scanBackendIds.size();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index cfce6060542..06644a7ab9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -550,7 +550,7 @@ public class OriginalPlanner extends Planner {
                     OlapScanNode scanNode = (OlapScanNode) child;
                     if (scanNode.isDupKeysOrMergeOnWrite()) {
                         sortNode.setUseTopnOpt(true);
-                        scanNode.setUseTopnOpt(true);
+                        // scanNode.setUseTopnOpt(true);
                     }
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index b88a2438d9a..c9febfa8047 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -49,11 +49,13 @@ import org.apache.doris.datasource.SplitGenerator;
 import org.apache.doris.datasource.SplitSource;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.query.StatsDelta;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
@@ -99,6 +101,9 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     // create a mapping between output slot's id and project expr
     Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
 
+    // support multi topn filter
+    protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
+
     public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, 
StatisticalType statisticalType) {
         super(id, desc.getId().asList(), planNodeName, statisticalType);
         this.desc = desc;
@@ -812,4 +817,28 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
             scanNode.updateScanRangeVersions(visibleVersionMap);
         }
     }
+
+    protected void toThrift(TPlanNode msg) {
+        // topn filter
+        if (useTopnFilter() && SessionVariable.enablePipelineEngineX()) {
+            List<Integer> topnFilterSourceNodeIds = getTopnFilterSortNodes()
+                    .stream()
+                    .map(sortNode -> sortNode.getId().asInt())
+                    .collect(Collectors.toList());
+            msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
+        }
+    }
+
+    public void addTopnFilterSortNode(SortNode sortNode) {
+        topnFilterSortNodes.add(sortNode);
+    }
+
+    public List<SortNode> getTopnFilterSortNodes() {
+        return topnFilterSortNodes;
+    }
+
+    public boolean useTopnFilter() {
+        return !topnFilterSortNodes.isEmpty();
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java
index 3136444725f..3d6461b923c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java
@@ -33,6 +33,8 @@ import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.stream.Collectors;
+
 public class TestExternalTableScanNode extends ExternalScanNode {
     private static final Logger LOG = 
LogManager.getLogger(TestExternalTableScanNode.class);
     private String tableName;
@@ -46,6 +48,12 @@ public class TestExternalTableScanNode extends 
ExternalScanNode {
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         StringBuilder output = new StringBuilder();
         output.append(prefix).append("TABLE: ").append(tableName).append("\n");
+        if (useTopnFilter()) {
+            String topnFilterSources = String.join(",",
+                    topnFilterSortNodes.stream()
+                            .map(node -> node.getId().asInt() + 
"").collect(Collectors.toList()));
+            output.append(prefix).append("TOPN 
OPT:").append(topnFilterSources).append("\n");
+        }
         return output.toString();
     }
 
@@ -74,6 +82,7 @@ public class TestExternalTableScanNode extends 
ExternalScanNode {
         msg.test_external_scan_node = new TTestExternalScanNode();
         msg.test_external_scan_node.setTupleId(desc.getId().asInt());
         msg.test_external_scan_node.setTableName(tableName);
+        super.toThrift(msg);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 431e30896eb..a9bcc69c4a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3837,10 +3837,8 @@ public class Coordinator implements CoordInterface {
             }
             Set<Integer> topnFilterSources = Sets.newLinkedHashSet();
             for (ScanNode scanNode : scanNodes) {
-                if (scanNode instanceof OlapScanNode) {
-                    for (SortNode sortNode : ((OlapScanNode) 
scanNode).getTopnFilterSortNodes()) {
-                        topnFilterSources.add(sortNode.getId().asInt());
-                    }
+                for (SortNode sortNode : scanNode.getTopnFilterSortNodes()) {
+                    topnFilterSources.add(sortNode.getId().asInt());
                 }
             }
 
diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out 
b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
index 06b0247814a..ec2058ca46e 100644
--- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
+++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
@@ -1263,9 +1263,29 @@ true
 1
 
 -- !sql67 --
+true   abc     def     2022-10-11      1.234   1       2       0       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       0       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       0       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       1       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       0       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       2       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       0       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       0       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       1       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       0       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       1       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       1       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       1       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       2       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       1       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       1       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       2       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       0       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       2       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       1       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       2       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       2       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       2       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       2       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
 true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       0       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       0       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       1       2022-10-22T10:59:59     34.123
 true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       1       2022-10-22T10:59:59     34.123
 true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       2       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       2       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
+true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
 true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
 true   abc     def     2022-10-11      1.234   1       2       3       
2022-10-22T10:59:59     34.123  true    abc     def     2022-10-11      1.234   
1       2       3       2022-10-22T10:59:59     34.123
 
diff --git 
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy 
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
index ae2566c445c..492fdeb349b 100644
--- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
+++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
@@ -537,9 +537,9 @@ suite("test_jdbc_query_pg", 
"p0,external,pg,external_docker,external_docker_pg")
         order_qt_sql63 """ SELECT * FROM (SELECT 1 a) x CROSS JOIN (SELECT 2 
b) y """
         order_qt_sql65 """ SELECT t.c FROM (SELECT 1) as t1 CROSS JOIN (SELECT 
0 AS c UNION ALL SELECT 1) t """
         order_qt_sql66 """ SELECT t.c FROM (SELECT 1) as a CROSS JOIN (SELECT 
0 AS c UNION ALL SELECT 1) t """
-        order_qt_sql67 """ SELECT * FROM (SELECT * FROM $jdbcPg14Table1 ORDER 
BY k8 LIMIT 5) a
+        order_qt_sql67 """ SELECT a.*, b.* FROM (SELECT * FROM $jdbcPg14Table1 
ORDER BY k8 LIMIT 5) a
                             JOIN (SELECT * FROM $jdbcPg14Table1 ORDER BY k8 
LIMIT 5) b ON 123 = 123
-                            order by a.k8 desc limit 5"""
+                            order by a.k8, b.k8 desc limit 25"""
         order_qt_sql68 """ SELECT id, count(1) as c FROM $dorisExTable1 GROUP 
BY id
                             HAVING c IN (select k8 from $jdbcPg14Table1 where 
k8 = 2) """
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to