This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d4d619f45f7 [fix](merge-cloud) Set visible version for OlapScanNode at the plan phase (#32473) d4d619f45f7 is described below commit d4d619f45f7614179fbd4f76a3febd0a57820f68 Author: walter <w41te...@gmail.com> AuthorDate: Thu Mar 21 19:23:48 2024 +0800 [fix](merge-cloud) Set visible version for OlapScanNode at the plan phase (#32473) This PR fix the spark read with incorrect partition version in the cloud mode --- .../apache/doris/cloud/qe/CloudCoordinator.java | 69 ---------------------- .../org/apache/doris/nereids/NereidsPlanner.java | 6 +- .../org/apache/doris/planner/OriginalPlanner.java | 3 + .../java/org/apache/doris/planner/Planner.java | 1 - .../java/org/apache/doris/planner/ScanNode.java | 69 ++++++++++++++++++++++ 5 files changed, 77 insertions(+), 71 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java index 2ce8950c12f..503c517ca2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java @@ -20,32 +20,22 @@ package org.apache.doris.cloud.qe; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.OlapTable; import org.apache.doris.cloud.catalog.CloudEnv; -import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.UserException; import org.apache.doris.nereids.stats.StatsErrorEstimator; -import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.Coordinator; -import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; public class CloudCoordinator extends Coordinator { private static final Logger LOG = LogManager.getLogger(Coordinator.class); @@ -100,63 +90,4 @@ public class CloudCoordinator extends Coordinator { + clusterName); } } - - @Override - protected void computeScanRangeAssignment() throws Exception { - setVisibleVersionForOlapScanNode(); - super.computeScanRangeAssignment(); - } - - // In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions. - // After all scan node are collected, it is possible to gain a snapshot of the partition version. - private void setVisibleVersionForOlapScanNode() throws RpcException, UserException { - List<CloudPartition> partitions = new ArrayList<>(); - Set<Long> partitionSet = new HashSet<>(); - for (ScanNode node : scanNodes) { - if (!(node instanceof OlapScanNode)) { - continue; - } - - OlapScanNode scanNode = (OlapScanNode) node; - OlapTable table = scanNode.getOlapTable(); - for (Long id : scanNode.getSelectedPartitionIds()) { - if (!partitionSet.contains(id)) { - partitionSet.add(id); - partitions.add((CloudPartition) table.getPartition(id)); - } - } - } - - if (partitions.isEmpty()) { - return; - } - - List<Long> versions = CloudPartition.getSnapshotVisibleVersion(partitions); - assert versions.size() == partitions.size() : "the got num versions is not equals to acquired num versions"; - if (versions.stream().anyMatch(x -> x <= 0)) { - int size = versions.size(); - for (int i = 0; i < size; ++i) { - if (versions.get(i) <= 0) { - LOG.warn("partition {} getVisibleVersion error, the visibleVersion is {}", - partitions.get(i).getId(), versions.get(i)); - throw new UserException("partition " + partitions.get(i).getId() - + " getVisibleVersion error, the visibleVersion is " + versions.get(i)); - } - } - } - - // ATTN: the table ids are ignored here because the both id are allocated from a same id generator. - Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size()) - .boxed() - .collect(Collectors.toMap(i -> partitions.get(i).getId(), versions::get)); - - for (ScanNode node : scanNodes) { - if (!(node instanceof OlapScanNode)) { - continue; - } - - OlapScanNode scanNode = (OlapScanNode) node; - scanNode.updateScanRangeVersions(visibleVersionMap); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index eedc77e9df7..424cdc5dd58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MTMV; import org.apache.doris.common.NereidsException; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.nereids.CascadesContext.Lock; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -101,7 +102,7 @@ public class NereidsPlanner extends Planner { } @Override - public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions queryOptions) { + public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions queryOptions) throws UserException { if (statementContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) { NereidsTracer.init(); } else { @@ -154,6 +155,9 @@ public class NereidsPlanner extends Planner { .collect(Collectors.toCollection(ArrayList::new)); logicalPlanAdapter.setColLabels(columnLabelList); logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls()); + + // update scan nodes visible version at the end of plan phase. + ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes()); } @VisibleForTesting diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index db327bddcb9..a72a76b6fca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -98,6 +98,9 @@ public class OriginalPlanner extends Planner { public void plan(StatementBase queryStmt, TQueryOptions queryOptions) throws UserException { createPlanFragments(queryStmt, analyzer, queryOptions); + + // update scan nodes visible version at the end of plan phase. + ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index 22495e792ff..44befc75df6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -125,5 +125,4 @@ public abstract class Planner { public abstract Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt); public abstract void addHook(PlannerHook hook); - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 21f6bb07bd7..490a72f895b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -36,15 +36,19 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.FederationBackendPolicy; import org.apache.doris.datasource.FileScanNode; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.rpc.RpcException; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.query.StatsDelta; @@ -67,10 +71,12 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Representation of the common elements of all scan nodes. @@ -738,4 +744,67 @@ public abstract class ScanNode extends PlanNode { public boolean shouldUseOneInstance() { return hasLimit() && conjuncts.isEmpty(); } + + // In cloud mode, meta read lock is not enough to keep a snapshot of the partition versions. + // After all scan node are collected, it is possible to gain a snapshot of the partition version. + public static void setVisibleVersionForOlapScanNodes(List<ScanNode> scanNodes) throws UserException { + if (Config.isNotCloudMode()) { + return; + } + + List<CloudPartition> partitions = new ArrayList<>(); + Set<Long> partitionSet = new HashSet<>(); + for (ScanNode node : scanNodes) { + if (!(node instanceof OlapScanNode)) { + continue; + } + + OlapScanNode scanNode = (OlapScanNode) node; + OlapTable table = scanNode.getOlapTable(); + for (Long id : scanNode.getSelectedPartitionIds()) { + if (!partitionSet.contains(id)) { + partitionSet.add(id); + partitions.add((CloudPartition) table.getPartition(id)); + } + } + } + + if (partitions.isEmpty()) { + return; + } + + List<Long> versions; + try { + versions = CloudPartition.getSnapshotVisibleVersion(partitions); + } catch (RpcException e) { + throw new UserException("get visible version for OlapScanNode failed", e); + } + + assert versions.size() == partitions.size() : "the got num versions is not equals to acquired num versions"; + if (versions.stream().anyMatch(x -> x <= 0)) { + int size = versions.size(); + for (int i = 0; i < size; ++i) { + if (versions.get(i) <= 0) { + LOG.warn("partition {} getVisibleVersion error, the visibleVersion is {}", + partitions.get(i).getId(), versions.get(i)); + throw new UserException("partition " + partitions.get(i).getId() + + " getVisibleVersion error, the visibleVersion is " + versions.get(i)); + } + } + } + + // ATTN: the table ids are ignored here because the both id are allocated from a same id generator. + Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size()) + .boxed() + .collect(Collectors.toMap(i -> partitions.get(i).getId(), versions::get)); + + for (ScanNode node : scanNodes) { + if (!(node instanceof OlapScanNode)) { + continue; + } + + OlapScanNode scanNode = (OlapScanNode) node; + scanNode.updateScanRangeVersions(visibleVersionMap); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org