This is an automated email from the ASF dual-hosted git repository. englefly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ee8d45fe3a1 [opt](nereids) support topn-filter for external table (#34674) ee8d45fe3a1 is described below commit ee8d45fe3a16dd1e812b13b026f7553cf8a1cae3 Author: minghong <engle...@gmail.com> AuthorDate: Wed May 15 10:36:01 2024 +0800 [opt](nereids) support topn-filter for external table (#34674) * support topn-filter for external table --- .../apache/doris/datasource/ExternalScanNode.java | 5 ++ .../org/apache/doris/datasource/FileScanNode.java | 9 +++- .../doris/datasource/es/source/EsScanNode.java | 8 ++++ .../doris/datasource/jdbc/source/JdbcScanNode.java | 8 ++++ .../doris/datasource/odbc/source/OdbcScanNode.java | 8 ++++ .../glue/translator/PhysicalPlanTranslator.java | 34 ++++++++++---- .../doris/nereids/processor/post/TopNScanOpt.java | 54 ++++++++++++---------- .../nereids/processor/post/TopnFilterContext.java | 46 ++++++++++++------ .../org/apache/doris/planner/DataGenScanNode.java | 12 +++-- .../org/apache/doris/planner/MysqlScanNode.java | 8 ++++ .../org/apache/doris/planner/OlapScanNode.java | 37 +-------------- .../org/apache/doris/planner/OriginalPlanner.java | 2 +- .../java/org/apache/doris/planner/ScanNode.java | 29 ++++++++++++ .../doris/planner/TestExternalTableScanNode.java | 9 ++++ .../main/java/org/apache/doris/qe/Coordinator.java | 6 +-- .../external_table_p0/jdbc/test_jdbc_query_pg.out | 20 ++++++++ .../jdbc/test_jdbc_query_pg.groovy | 4 +- 17 files changed, 203 insertions(+), 96 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java index d41fab5916c..e85fed8b62a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java @@ -24,6 +24,7 @@ import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.logging.log4j.LogManager; @@ -96,4 +97,8 @@ public abstract class ExternalScanNode extends ScanNode { public int getNumInstances() { return scanRangeLocations.size(); } + + protected void toThrift(TPlanNode msg) { + super.toThrift(msg); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 227f636e67b..fba97bc5959 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -55,6 +55,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Base class for External File Scan, including external query and load. @@ -93,6 +94,7 @@ public abstract class FileScanNode extends ExternalScanNode { fileScanNode.setTableName(desc.getTable().getName()); } planNode.setFileScanNode(fileScanNode); + super.toThrift(planNode); } @Override @@ -167,7 +169,12 @@ public abstract class FileScanNode extends ExternalScanNode { } output.append(String.format("numNodes=%s", numNodes)).append("\n"); output.append(prefix).append(String.format("pushdown agg=%s", pushDownAggNoGroupingOp)).append("\n"); - + if (useTopnFilter()) { + String topnFilterSources = String.join(",", + topnFilterSortNodes.stream() + .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); + output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n"); + } return output.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java index 7bc4f9ea1c0..663657b5d95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java @@ -67,6 +67,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * ScanNode for Elasticsearch. @@ -183,6 +184,7 @@ public class EsScanNode extends ExternalScanNode { } esScanNode.setProperties(properties); msg.es_scan_node = esScanNode; + super.toThrift(msg); } // only do partition(es index level) prune @@ -316,6 +318,12 @@ public class EsScanNode extends ExternalScanNode { String indexName = table.getIndexName(); String typeName = table.getMappingType(); output.append(prefix).append(String.format("ES index/type: %s/%s", indexName, typeName)).append("\n"); + if (useTopnFilter()) { + String topnFilterSources = String.join(",", + topnFilterSortNodes.stream() + .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); + output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n"); + } return output.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java index 58ab0f9d226..cd06a9a0d20 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java @@ -60,6 +60,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; public class JdbcScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class); @@ -257,6 +258,12 @@ public class JdbcScanNode extends ExternalScanNode { output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n"); } } + if (useTopnFilter()) { + String topnFilterSources = String.join(",", + topnFilterSortNodes.stream() + .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); + output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n"); + } return output.toString(); } @@ -308,6 +315,7 @@ public class JdbcScanNode extends ExternalScanNode { msg.jdbc_scan_node.setQueryString(getJdbcQueryStr()); } msg.jdbc_scan_node.setTableType(jdbcType); + super.toThrift(msg); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java index 9b597dddb54..9b964e41fa9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java @@ -54,6 +54,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * Full scan of an ODBC table. @@ -141,6 +142,12 @@ public class OdbcScanNode extends ExternalScanNode { Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts); output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n"); } + if (useTopnFilter()) { + String topnFilterSources = String.join(",", + topnFilterSortNodes.stream() + .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); + output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n"); + } return output.toString(); } @@ -233,6 +240,7 @@ public class OdbcScanNode extends ExternalScanNode { odbcScanNode.setQueryString(getOdbcQueryStr()); msg.odbc_scan_node = odbcScanNode; + super.toThrift(msg); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index b8cae519296..fba5558c462 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -593,6 +593,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context) ) ); + if (context.getTopnFilterContext().isTopnFilterTarget(fileScan)) { + context.getTopnFilterContext().addLegacyTarget(fileScan, scanNode); + } + Utils.execWithUncheckedException(scanNode::finalizeForNereids); // Create PlanFragment DataPartition dataPartition = DataPartition.RANDOM; @@ -637,6 +641,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context) ) ); + if (context.getTopnFilterContext().isTopnFilterTarget(esScan)) { + context.getTopnFilterContext().addLegacyTarget(esScan, esScanNode); + } Utils.execWithUncheckedException(esScanNode::finalizeForNereids); DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition); @@ -661,6 +668,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context) ) ); + if (context.getTopnFilterContext().isTopnFilterTarget(jdbcScan)) { + context.getTopnFilterContext().addLegacyTarget(jdbcScan, jdbcScanNode); + } Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids); DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition); @@ -685,6 +695,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context) ) ); + if (context.getTopnFilterContext().isTopnFilterTarget(odbcScan)) { + context.getTopnFilterContext().addLegacyTarget(odbcScan, odbcScanNode); + } Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids); DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition); @@ -763,7 +776,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla ); olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId())); if (context.getTopnFilterContext().isTopnFilterTarget(olapScan)) { - olapScanNode.setUseTopnOpt(true); context.getTopnFilterContext().addLegacyTarget(olapScan, olapScanNode); } // TODO: we need to remove all finalizeForNereids @@ -790,7 +802,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanFragment planFragment = visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context); OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot(); if (context.getTopnFilterContext().isTopnFilterTarget(deferMaterializeOlapScan)) { - olapScanNode.setUseTopnOpt(true); context.getTopnFilterContext().addLegacyTarget(deferMaterializeOlapScan, olapScanNode); } TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId()); @@ -2113,11 +2124,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla if (context.getTopnFilterContext().isTopnFilterSource(topN)) { sortNode.setUseTopnOpt(true); context.getTopnFilterContext().getTargets(topN).forEach( - olapScan -> { - Optional<OlapScanNode> legacyScan = - context.getTopnFilterContext().getLegacyScanNode(olapScan); + relation -> { + Optional<ScanNode> legacyScan = + context.getTopnFilterContext().getLegacyScanNode(relation); Preconditions.checkState(legacyScan.isPresent(), - "cannot find OlapScanNode for topn filter"); + "cannot find ScanNode for topn filter:\n" + + "relation: %s \n%s", + relation.toString(), + context.getTopnFilterContext().toString()); legacyScan.get().addTopnFilterSortNode(sortNode); } ); @@ -2173,11 +2187,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla if (context.getTopnFilterContext().isTopnFilterSource(topN)) { sortNode.setUseTopnOpt(true); context.getTopnFilterContext().getTargets(topN).forEach( - olapScan -> { - Optional<OlapScanNode> legacyScan = - context.getTopnFilterContext().getLegacyScanNode(olapScan); + relation -> { + Optional<ScanNode> legacyScan = + context.getTopnFilterContext().getLegacyScanNode(relation); Preconditions.checkState(legacyScan.isPresent(), - "cannot find OlapScanNode for topn filter"); + "cannot find ScanNode for topn filter"); legacyScan.get().addTopnFilterSortNode(sortNode); } ); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java index a8129639f5e..ec1e52d6426 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java @@ -23,9 +23,14 @@ import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.SortPhase; import org.apache.doris.nereids.trees.plans.algebra.Join; -import org.apache.doris.nereids.trees.plans.algebra.OlapScan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN; +import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow; import org.apache.doris.qe.ConnectContext; @@ -41,10 +46,9 @@ import java.util.Optional; */ public class TopNScanOpt extends PlanPostProcessor { - @Override public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) { - Optional<OlapScan> scanOpt = findScanForTopnFilter(topN); + Optional<PhysicalRelation> scanOpt = findScanForTopnFilter(topN); scanOpt.ifPresent(scan -> ctx.getTopnFilterContext().addTopnFilter(topN, scan)); topN.child().accept(this, ctx); return topN; @@ -53,13 +57,13 @@ public class TopNScanOpt extends PlanPostProcessor { @Override public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN, CascadesContext context) { - Optional<OlapScan> scanOpt = findScanForTopnFilter(topN.getPhysicalTopN()); + Optional<PhysicalRelation> scanOpt = findScanForTopnFilter(topN.getPhysicalTopN()); scanOpt.ifPresent(scan -> context.getTopnFilterContext().addTopnFilter(topN, scan)); topN.child().accept(this, context); return topN; } - private Optional<OlapScan> findScanForTopnFilter(PhysicalTopN<? extends Plan> topN) { + private Optional<PhysicalRelation> findScanForTopnFilter(PhysicalTopN<? extends Plan> topN) { if (topN.getSortPhase() != SortPhase.LOCAL_SORT) { return Optional.empty(); } @@ -92,30 +96,23 @@ public class TopNScanOpt extends PlanPostProcessor { } boolean nullsFirst = topN.getOrderKeys().get(0).isNullFirst(); - OlapScan olapScan = findScanNodeBySlotReference(topN, (SlotReference) firstKey, nullsFirst); - if (olapScan != null - && olapScan.getTable().isDupKeysOrMergeOnWrite() - && olapScan instanceof PhysicalCatalogRelation) { - return Optional.of(olapScan); - } - - return Optional.empty(); + return findScanNodeBySlotReference(topN, (SlotReference) firstKey, nullsFirst); } - private OlapScan findScanNodeBySlotReference(Plan root, SlotReference slot, boolean nullsFirst) { + private Optional<PhysicalRelation> findScanNodeBySlotReference(Plan root, SlotReference slot, boolean nullsFirst) { if (root instanceof PhysicalWindow) { - return null; + return Optional.empty(); } - if (root instanceof OlapScan) { - if (root.getOutputSet().contains(slot)) { - return (OlapScan) root; + if (root instanceof PhysicalRelation) { + if (root.getOutputSet().contains(slot) && supportPhysicalRelations((PhysicalRelation) root)) { + return Optional.of((PhysicalRelation) root); } else { - return null; + return Optional.empty(); } } - OlapScan target = null; + Optional<PhysicalRelation> target; if (root instanceof Join) { Join join = (Join) root; if (nullsFirst && join.getJoinType().isOuterJoin()) { @@ -123,7 +120,7 @@ public class TopNScanOpt extends PlanPostProcessor { // and to the right child of rightOuterJoin. // but we have rule to push topn down to the left/right side. and topn-filter // will be generated according to the inferred topn node. - return null; + return Optional.empty(); } // try to push to both left and right child if (root.child(0).getOutputSet().contains(slot)) { @@ -139,12 +136,12 @@ public class TopNScanOpt extends PlanPostProcessor { Plan child = root.child(0); if (child.getOutputSet().contains(slot)) { target = findScanNodeBySlotReference(child, slot, nullsFirst); - if (target != null) { + if (target.isPresent()) { return target; } } } - return target; + return Optional.empty(); } private long getTopNOptLimitThreshold() { @@ -153,4 +150,13 @@ public class TopNScanOpt extends PlanPostProcessor { } return -1; } + + private boolean supportPhysicalRelations(PhysicalRelation relation) { + return relation instanceof PhysicalOlapScan + || relation instanceof PhysicalOdbcScan + || relation instanceof PhysicalEsScan + || relation instanceof PhysicalFileScan + || relation instanceof PhysicalJdbcScan + || relation instanceof PhysicalDeferMaterializeOlapScan; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java index b5f79defef4..6a4fe3123df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java @@ -17,9 +17,9 @@ package org.apache.doris.nereids.processor.post; -import org.apache.doris.nereids.trees.plans.algebra.OlapScan; import org.apache.doris.nereids.trees.plans.algebra.TopN; -import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; +import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SortNode; import com.google.common.collect.Lists; @@ -35,20 +35,20 @@ import java.util.Set; * topN runtime filter context */ public class TopnFilterContext { - private final Map<TopN, List<OlapScan>> filters = Maps.newHashMap(); + private final Map<TopN, List<PhysicalRelation>> filters = Maps.newHashMap(); private final Set<TopN> sources = Sets.newHashSet(); - private final Set<OlapScan> targets = Sets.newHashSet(); - private final Map<OlapScan, OlapScanNode> legacyTargetsMap = Maps.newHashMap(); + private final Set<PhysicalRelation> targets = Sets.newHashSet(); + private final Map<PhysicalRelation, ScanNode> legacyTargetsMap = Maps.newHashMap(); private final Map<TopN, SortNode> legacySourceMap = Maps.newHashMap(); /** * add topN filter */ - public void addTopnFilter(TopN topn, OlapScan scan) { + public void addTopnFilter(TopN topn, PhysicalRelation scan) { targets.add(scan); sources.add(topn); - List<OlapScan> targets = filters.get(topn); + List<PhysicalRelation> targets = filters.get(topn); if (targets == null) { filters.put(topn, Lists.newArrayList(scan)); } else { @@ -59,14 +59,14 @@ public class TopnFilterContext { /** * find the corresponding sortNode for topn filter */ - public Optional<OlapScanNode> getLegacyScanNode(OlapScan scan) { - return legacyTargetsMap.keySet().contains(scan) + public Optional<ScanNode> getLegacyScanNode(PhysicalRelation scan) { + return legacyTargetsMap.containsKey(scan) ? Optional.of(legacyTargetsMap.get(scan)) : Optional.empty(); } public Optional<SortNode> getLegacySortNode(TopN topn) { - return legacyTargetsMap.keySet().contains(topn) + return legacyTargetsMap.containsKey(topn) ? Optional.of(legacySourceMap.get(topn)) : Optional.empty(); } @@ -75,19 +75,35 @@ public class TopnFilterContext { return sources.contains(topn); } - public boolean isTopnFilterTarget(OlapScan scan) { - return targets.contains(scan); + public boolean isTopnFilterTarget(PhysicalRelation relation) { + return targets.contains(relation); } public void addLegacySource(TopN topn, SortNode sort) { legacySourceMap.put(topn, sort); } - public void addLegacyTarget(OlapScan olapScan, OlapScanNode legacy) { - legacyTargetsMap.put(olapScan, legacy); + public void addLegacyTarget(PhysicalRelation relation, ScanNode legacy) { + legacyTargetsMap.put(relation, legacy); } - public List<OlapScan> getTargets(TopN topn) { + public List<PhysicalRelation> getTargets(TopN topn) { return filters.get(topn); } + + /** + * toString + */ + public String toString() { + StringBuilder builder = new StringBuilder("TopnFilterContext\n"); + String indent = " "; + String arrow = " -> "; + builder.append("filters:\n"); + for (TopN topn : filters.keySet()) { + builder.append(indent).append(topn.toString()).append("\n"); + builder.append(indent).append(arrow).append(filters.get(topn)).append("\n"); + } + return builder.toString(); + + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java index 14a50160d63..1c760adb94a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java @@ -38,6 +38,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.stream.Collectors; /** * This scan node is used for data source generated from memory. @@ -79,6 +80,7 @@ public class DataGenScanNode extends ExternalScanNode { dataGenScanNode.setTupleId(desc.getId().asInt()); dataGenScanNode.setFuncName(tvf.getDataGenFunctionName()); msg.data_gen_scan_node = dataGenScanNode; + super.toThrift(msg); } @Override @@ -130,11 +132,13 @@ public class DataGenScanNode extends ExternalScanNode { if (!conjuncts.isEmpty()) { output.append(prefix).append("predicates: ").append(getExplainString(conjuncts)).append("\n"); } - output.append(prefix).append("table value function: ").append(tvf.getDataGenFunctionName()).append("\n"); - - - + if (useTopnFilter()) { + String topnFilterSources = String.join(",", + topnFilterSortNodes.stream() + .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); + output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n"); + } return output.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java index 703566bbc0b..fe75530dae9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java @@ -43,6 +43,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * Full scan of an MySQL table. @@ -93,6 +94,12 @@ public class MysqlScanNode extends ExternalScanNode { Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts); output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n"); } + if (useTopnFilter()) { + String topnFilterSources = String.join(",", + topnFilterSortNodes.stream() + .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); + output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n"); + } return output.toString(); } @@ -153,6 +160,7 @@ public class MysqlScanNode extends ExternalScanNode { protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.MYSQL_SCAN_NODE; msg.mysql_scan_node = new TMySQLScanNode(desc.getId().asInt(), tblName, columns, filters); + super.toThrift(msg); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 6b5128ba5eb..3e32853119b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -178,12 +178,6 @@ public class OlapScanNode extends ScanNode { // It's limit for scanner instead of scanNode so we add a new limit. private long sortLimit = -1; - // useTopnOpt is equivalent to !topnFilterSortNodes.isEmpty(). - // keep this flag for compatibility. - private boolean useTopnOpt = false; - // support multi topn filter - private final List<SortNode> topnFilterSortNodes = Lists.newArrayList(); - // List of tablets will be scanned by current olap_scan_node private ArrayList<Long> scanTabletIds = Lists.newArrayList(); @@ -319,14 +313,6 @@ public class OlapScanNode extends ScanNode { this.sortLimit = sortLimit; } - public boolean getUseTopnOpt() { - return useTopnOpt; - } - - public void setUseTopnOpt(boolean useTopnOpt) { - this.useTopnOpt = useTopnOpt; - } - public Collection<Long> getSelectedPartitionIds() { return selectedPartitionIds; } @@ -1347,7 +1333,7 @@ public class OlapScanNode extends ScanNode { if (sortLimit != -1) { output.append(prefix).append("SORT LIMIT: ").append(sortLimit).append("\n"); } - if (useTopnOpt) { + if (useTopnFilter()) { String topnFilterSources = String.join(",", topnFilterSortNodes.stream() .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); @@ -1524,18 +1510,6 @@ public class OlapScanNode extends ScanNode { if (sortLimit != -1) { msg.olap_scan_node.setSortLimit(sortLimit); } - msg.olap_scan_node.setUseTopnOpt(useTopnOpt); - List<Integer> topnFilterSourceNodeIds = getTopnFilterSortNodes() - .stream() - .map(sortNode -> sortNode.getId().asInt()) - .collect(Collectors.toList()); - if (!topnFilterSourceNodeIds.isEmpty()) { - if (SessionVariable.enablePipelineEngineX()) { - msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds); - } else { - msg.olap_scan_node.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds); - } - } msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift()); String tableName = olapTable.getName(); if (selectedIndexId != -1) { @@ -1556,6 +1530,7 @@ public class OlapScanNode extends ScanNode { if (shouldColoScan || SessionVariable.enablePipelineEngineX()) { msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds)); } + super.toThrift(msg); } public void collectColumns(Analyzer analyzer, Set<String> equivalenceColumns, Set<String> unequivalenceColumns) { @@ -1813,14 +1788,6 @@ public class OlapScanNode extends ScanNode { return getScanTabletIds().size(); } - public void addTopnFilterSortNode(SortNode sortNode) { - topnFilterSortNodes.add(sortNode); - } - - public List<SortNode> getTopnFilterSortNodes() { - return topnFilterSortNodes; - } - @Override public int numScanBackends() { return scanBackendIds.size(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index cfce6060542..06644a7ab9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -550,7 +550,7 @@ public class OriginalPlanner extends Planner { OlapScanNode scanNode = (OlapScanNode) child; if (scanNode.isDupKeysOrMergeOnWrite()) { sortNode.setUseTopnOpt(true); - scanNode.setUseTopnOpt(true); + // scanNode.setUseTopnOpt(true); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index b88a2438d9a..c9febfa8047 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -49,11 +49,13 @@ import org.apache.doris.datasource.SplitGenerator; import org.apache.doris.datasource.SplitSource; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.rpc.RpcException; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.query.StatsDelta; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; @@ -99,6 +101,9 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { // create a mapping between output slot's id and project expr Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>(); + // support multi topn filter + protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList(); + public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType) { super(id, desc.getId().asList(), planNodeName, statisticalType); this.desc = desc; @@ -812,4 +817,28 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { scanNode.updateScanRangeVersions(visibleVersionMap); } } + + protected void toThrift(TPlanNode msg) { + // topn filter + if (useTopnFilter() && SessionVariable.enablePipelineEngineX()) { + List<Integer> topnFilterSourceNodeIds = getTopnFilterSortNodes() + .stream() + .map(sortNode -> sortNode.getId().asInt()) + .collect(Collectors.toList()); + msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds); + } + } + + public void addTopnFilterSortNode(SortNode sortNode) { + topnFilterSortNodes.add(sortNode); + } + + public List<SortNode> getTopnFilterSortNodes() { + return topnFilterSortNodes; + } + + public boolean useTopnFilter() { + return !topnFilterSortNodes.isEmpty(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java index 3136444725f..3d6461b923c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java @@ -33,6 +33,8 @@ import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.stream.Collectors; + public class TestExternalTableScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(TestExternalTableScanNode.class); private String tableName; @@ -46,6 +48,12 @@ public class TestExternalTableScanNode extends ExternalScanNode { public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); output.append(prefix).append("TABLE: ").append(tableName).append("\n"); + if (useTopnFilter()) { + String topnFilterSources = String.join(",", + topnFilterSortNodes.stream() + .map(node -> node.getId().asInt() + "").collect(Collectors.toList())); + output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n"); + } return output.toString(); } @@ -74,6 +82,7 @@ public class TestExternalTableScanNode extends ExternalScanNode { msg.test_external_scan_node = new TTestExternalScanNode(); msg.test_external_scan_node.setTupleId(desc.getId().asInt()); msg.test_external_scan_node.setTableName(tableName); + super.toThrift(msg); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 431e30896eb..a9bcc69c4a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3837,10 +3837,8 @@ public class Coordinator implements CoordInterface { } Set<Integer> topnFilterSources = Sets.newLinkedHashSet(); for (ScanNode scanNode : scanNodes) { - if (scanNode instanceof OlapScanNode) { - for (SortNode sortNode : ((OlapScanNode) scanNode).getTopnFilterSortNodes()) { - topnFilterSources.add(sortNode.getId().asInt()); - } + for (SortNode sortNode : scanNode.getTopnFilterSortNodes()) { + topnFilterSources.add(sortNode.getId().asInt()); } } diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out index 06b0247814a..ec2058ca46e 100644 --- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out +++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out @@ -1263,9 +1263,29 @@ true 1 -- !sql67 -- +true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 0 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 1 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 2 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 +true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234 1 2 3 2022-10-22T10:59:59 34.123 diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy index ae2566c445c..492fdeb349b 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy @@ -537,9 +537,9 @@ suite("test_jdbc_query_pg", "p0,external,pg,external_docker,external_docker_pg") order_qt_sql63 """ SELECT * FROM (SELECT 1 a) x CROSS JOIN (SELECT 2 b) y """ order_qt_sql65 """ SELECT t.c FROM (SELECT 1) as t1 CROSS JOIN (SELECT 0 AS c UNION ALL SELECT 1) t """ order_qt_sql66 """ SELECT t.c FROM (SELECT 1) as a CROSS JOIN (SELECT 0 AS c UNION ALL SELECT 1) t """ - order_qt_sql67 """ SELECT * FROM (SELECT * FROM $jdbcPg14Table1 ORDER BY k8 LIMIT 5) a + order_qt_sql67 """ SELECT a.*, b.* FROM (SELECT * FROM $jdbcPg14Table1 ORDER BY k8 LIMIT 5) a JOIN (SELECT * FROM $jdbcPg14Table1 ORDER BY k8 LIMIT 5) b ON 123 = 123 - order by a.k8 desc limit 5""" + order by a.k8, b.k8 desc limit 25""" order_qt_sql68 """ SELECT id, count(1) as c FROM $dorisExTable1 GROUP BY id HAVING c IN (select k8 from $jdbcPg14Table1 where k8 = 2) """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org