This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d619f45f7 [fix](merge-cloud) Set visible version for OlapScanNode at 
the plan phase (#32473)
d4d619f45f7 is described below

commit d4d619f45f7614179fbd4f76a3febd0a57820f68
Author: walter <w41te...@gmail.com>
AuthorDate: Thu Mar 21 19:23:48 2024 +0800

    [fix](merge-cloud) Set visible version for OlapScanNode at the plan phase 
(#32473)
    
    This PR fix the spark read with incorrect partition version in the cloud 
mode
---
 .../apache/doris/cloud/qe/CloudCoordinator.java    | 69 ----------------------
 .../org/apache/doris/nereids/NereidsPlanner.java   |  6 +-
 .../org/apache/doris/planner/OriginalPlanner.java  |  3 +
 .../java/org/apache/doris/planner/Planner.java     |  1 -
 .../java/org/apache/doris/planner/ScanNode.java    | 69 ++++++++++++++++++++++
 5 files changed, 77 insertions(+), 71 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
index 2ce8950c12f..503c517ca2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/CloudCoordinator.java
@@ -20,32 +20,22 @@ package org.apache.doris.cloud.qe;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.cloud.catalog.CloudEnv;
-import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.UserException;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
-import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
-import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Strings;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 public class CloudCoordinator extends Coordinator {
     private static final Logger LOG = LogManager.getLogger(Coordinator.class);
@@ -100,63 +90,4 @@ public class CloudCoordinator extends Coordinator {
                 + clusterName);
         }
     }
-
-    @Override
-    protected void computeScanRangeAssignment() throws Exception {
-        setVisibleVersionForOlapScanNode();
-        super.computeScanRangeAssignment();
-    }
-
-    // In cloud mode, meta read lock is not enough to keep a snapshot of the 
partition versions.
-    // After all scan node are collected, it is possible to gain a snapshot of 
the partition version.
-    private void setVisibleVersionForOlapScanNode() throws RpcException, 
UserException {
-        List<CloudPartition> partitions = new ArrayList<>();
-        Set<Long> partitionSet = new HashSet<>();
-        for (ScanNode node : scanNodes) {
-            if (!(node instanceof OlapScanNode)) {
-                continue;
-            }
-
-            OlapScanNode scanNode = (OlapScanNode) node;
-            OlapTable table = scanNode.getOlapTable();
-            for (Long id : scanNode.getSelectedPartitionIds()) {
-                if (!partitionSet.contains(id)) {
-                    partitionSet.add(id);
-                    partitions.add((CloudPartition) table.getPartition(id));
-                }
-            }
-        }
-
-        if (partitions.isEmpty()) {
-            return;
-        }
-
-        List<Long> versions = 
CloudPartition.getSnapshotVisibleVersion(partitions);
-        assert versions.size() == partitions.size() : "the got num versions is 
not equals to acquired num versions";
-        if (versions.stream().anyMatch(x -> x <= 0)) {
-            int size = versions.size();
-            for (int i = 0; i < size; ++i) {
-                if (versions.get(i) <= 0) {
-                    LOG.warn("partition {} getVisibleVersion error, the 
visibleVersion is {}",
-                            partitions.get(i).getId(), versions.get(i));
-                    throw new UserException("partition " + 
partitions.get(i).getId()
-                        + " getVisibleVersion error, the visibleVersion is " + 
versions.get(i));
-                }
-            }
-        }
-
-        // ATTN: the table ids are ignored here because the both id are 
allocated from a same id generator.
-        Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size())
-                .boxed()
-                .collect(Collectors.toMap(i -> partitions.get(i).getId(), 
versions::get));
-
-        for (ScanNode node : scanNodes) {
-            if (!(node instanceof OlapScanNode)) {
-                continue;
-            }
-
-            OlapScanNode scanNode = (OlapScanNode) node;
-            scanNode.updateScanRangeVersions(visibleVersionMap);
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index eedc77e9df7..424cdc5dd58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.common.NereidsException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.nereids.CascadesContext.Lock;
 import org.apache.doris.nereids.exceptions.AnalysisException;
@@ -101,7 +102,7 @@ public class NereidsPlanner extends Planner {
     }
 
     @Override
-    public void plan(StatementBase queryStmt, 
org.apache.doris.thrift.TQueryOptions queryOptions) {
+    public void plan(StatementBase queryStmt, 
org.apache.doris.thrift.TQueryOptions queryOptions) throws UserException {
         if 
(statementContext.getConnectContext().getSessionVariable().isEnableNereidsTrace())
 {
             NereidsTracer.init();
         } else {
@@ -154,6 +155,9 @@ public class NereidsPlanner extends Planner {
                 .collect(Collectors.toCollection(ArrayList::new));
         logicalPlanAdapter.setColLabels(columnLabelList);
         logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls());
+
+        // update scan nodes visible version at the end of plan phase.
+        ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
     }
 
     @VisibleForTesting
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index db327bddcb9..a72a76b6fca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -98,6 +98,9 @@ public class OriginalPlanner extends Planner {
     public void plan(StatementBase queryStmt, TQueryOptions queryOptions)
             throws UserException {
         createPlanFragments(queryStmt, analyzer, queryOptions);
+
+        // update scan nodes visible version at the end of plan phase.
+        ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 22495e792ff..44befc75df6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -125,5 +125,4 @@ public abstract class Planner {
     public abstract Optional<ResultSet> handleQueryInFe(StatementBase 
parsedStmt);
 
     public abstract void addHook(PlannerHook hook);
-
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 21f6bb07bd7..490a72f895b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -36,15 +36,19 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.FederationBackendPolicy;
 import org.apache.doris.datasource.FileScanNode;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.rpc.RpcException;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.query.StatsDelta;
@@ -67,10 +71,12 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Representation of the common elements of all scan nodes.
@@ -738,4 +744,67 @@ public abstract class ScanNode extends PlanNode {
     public boolean shouldUseOneInstance() {
         return hasLimit() && conjuncts.isEmpty();
     }
+
+    // In cloud mode, meta read lock is not enough to keep a snapshot of the 
partition versions.
+    // After all scan node are collected, it is possible to gain a snapshot of 
the partition version.
+    public static void setVisibleVersionForOlapScanNodes(List<ScanNode> 
scanNodes) throws UserException {
+        if (Config.isNotCloudMode()) {
+            return;
+        }
+
+        List<CloudPartition> partitions = new ArrayList<>();
+        Set<Long> partitionSet = new HashSet<>();
+        for (ScanNode node : scanNodes) {
+            if (!(node instanceof OlapScanNode)) {
+                continue;
+            }
+
+            OlapScanNode scanNode = (OlapScanNode) node;
+            OlapTable table = scanNode.getOlapTable();
+            for (Long id : scanNode.getSelectedPartitionIds()) {
+                if (!partitionSet.contains(id)) {
+                    partitionSet.add(id);
+                    partitions.add((CloudPartition) table.getPartition(id));
+                }
+            }
+        }
+
+        if (partitions.isEmpty()) {
+            return;
+        }
+
+        List<Long> versions;
+        try {
+            versions = CloudPartition.getSnapshotVisibleVersion(partitions);
+        } catch (RpcException e) {
+            throw new UserException("get visible version for OlapScanNode 
failed", e);
+        }
+
+        assert versions.size() == partitions.size() : "the got num versions is 
not equals to acquired num versions";
+        if (versions.stream().anyMatch(x -> x <= 0)) {
+            int size = versions.size();
+            for (int i = 0; i < size; ++i) {
+                if (versions.get(i) <= 0) {
+                    LOG.warn("partition {} getVisibleVersion error, the 
visibleVersion is {}",
+                            partitions.get(i).getId(), versions.get(i));
+                    throw new UserException("partition " + 
partitions.get(i).getId()
+                        + " getVisibleVersion error, the visibleVersion is " + 
versions.get(i));
+                }
+            }
+        }
+
+        // ATTN: the table ids are ignored here because the both id are 
allocated from a same id generator.
+        Map<Long, Long> visibleVersionMap = IntStream.range(0, versions.size())
+                .boxed()
+                .collect(Collectors.toMap(i -> partitions.get(i).getId(), 
versions::get));
+
+        for (ScanNode node : scanNodes) {
+            if (!(node instanceof OlapScanNode)) {
+                continue;
+            }
+
+            OlapScanNode scanNode = (OlapScanNode) node;
+            scanNode.updateScanRangeVersions(visibleVersionMap);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to