This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ffc6f08354 [fix](Nereids) should do read lock on table being insert 
when analyze (#25619)
3ffc6f08354 is described below

commit 3ffc6f08354ca10a796d880aa762dee7a2f3be00
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Fri Oct 20 21:09:19 2023 +0800

    [fix](Nereids) should do read lock on table being insert when analyze 
(#25619)
---
 .../org/apache/doris/nereids/CascadesContext.java  | 121 ++++++++++++++++-----
 .../doris/nereids/rules/analysis/BindRelation.java |   2 +-
 .../apache/doris/nereids/util/ReadLockTest.java    |  41 +++++--
 3 files changed, 123 insertions(+), 41 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index 4cd248cb8a3..19d38873f9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -23,6 +23,8 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.Pair;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.nereids.analyzer.Scope;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
 import org.apache.doris.nereids.analyzer.UnboundRelation;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.jobs.Job;
@@ -52,7 +54,9 @@ import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
@@ -61,6 +65,8 @@ import org.apache.doris.statistics.Statistics;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.util.Lists;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -74,6 +80,7 @@ import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 /**
@@ -96,7 +103,7 @@ public class CascadesContext implements ScheduleContext {
     private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
     private final RuntimeFilterContext runtimeFilterContext;
     private Optional<Scope> outerScope = Optional.empty();
-    private List<TableIf> tables = null;
+    private Map<Long, TableIf> tables = null;
 
     private boolean isRewriteRoot;
     private volatile boolean isTimeout = false;
@@ -213,7 +220,7 @@ public class CascadesContext implements ScheduleContext {
     }
 
     public void setTables(List<TableIf> tables) {
-        this.tables = tables;
+        this.tables = tables.stream().collect(Collectors.toMap(TableIf::getId, 
t -> t, (t1, t2) -> t1));
     }
 
     public ConnectContext getConnectContext() {
@@ -330,21 +337,23 @@ public class CascadesContext implements ScheduleContext {
      * Extract tables.
      */
     public void extractTables(LogicalPlan logicalPlan) {
-        Set<UnboundRelation> relations = getTables(logicalPlan);
-        tables = new ArrayList<>();
-        for (UnboundRelation r : relations) {
+        Set<List<String>> tableNames = getTables(logicalPlan);
+        tables = Maps.newHashMap();
+        for (List<String> tableName : tableNames) {
             try {
-                tables.add(getTable(r));
+                TableIf table = getTable(tableName);
+                tables.put(table.getId(), table);
             } catch (Throwable e) {
                 // IGNORE
             }
         }
+
     }
 
     /** get table by table name, try to get from information from dumpfile 
first */
     public TableIf getTableInMinidumpCache(String tableName) {
-        Preconditions.checkState(tables != null);
-        for (TableIf table : tables) {
+        Preconditions.checkState(tables != null, "tables should not be null");
+        for (TableIf table : tables.values()) {
             if (table.getName().equals(tableName)) {
                 return table;
             }
@@ -356,45 +365,101 @@ public class CascadesContext implements ScheduleContext {
     }
 
     public List<TableIf> getTables() {
-        return tables;
+        if (tables == null) {
+            return null;
+        } else {
+            return Lists.newArrayList(tables.values());
+        }
     }
 
-    private Set<UnboundRelation> getTables(LogicalPlan logicalPlan) {
-        Set<UnboundRelation> unboundRelations = new HashSet<>();
+    private Set<List<String>> getTables(LogicalPlan logicalPlan) {
+        final Set<List<String>> tableNames = new HashSet<>();
         logicalPlan.foreach(p -> {
             if (p instanceof LogicalFilter) {
-                
unboundRelations.addAll(extractUnboundRelationFromFilter((LogicalFilter<?>) p));
+                
tableNames.addAll(extractTableNamesFromFilter((LogicalFilter<?>) p));
             } else if (p instanceof LogicalCTE) {
-                
unboundRelations.addAll(extractUnboundRelationFromCTE((LogicalCTE<?>) p));
+                tableNames.addAll(extractTableNamesFromCTE((LogicalCTE<?>) p));
+            } else if (p instanceof LogicalProject) {
+                
tableNames.addAll(extractTableNamesFromProject((LogicalProject<?>) p));
+            } else if (p instanceof LogicalHaving) {
+                
tableNames.addAll(extractTableNamesFromHaving((LogicalHaving<?>) p));
+            } else if (p instanceof UnboundOneRowRelation) {
+                
tableNames.addAll(extractTableNamesFromOneRowRelation((UnboundOneRowRelation) 
p));
             } else {
-                
unboundRelations.addAll(p.collect(UnboundRelation.class::isInstance));
+                Set<LogicalPlan> logicalPlans = p.collect(
+                        n -> (n instanceof UnboundRelation || n instanceof 
UnboundOlapTableSink));
+                for (LogicalPlan plan : logicalPlans) {
+                    if (plan instanceof UnboundRelation) {
+                        tableNames.add(((UnboundRelation) 
plan).getNameParts());
+                    } else if (plan instanceof UnboundOlapTableSink) {
+                        tableNames.add(((UnboundOlapTableSink<?>) 
plan).getNameParts());
+                    } else {
+                        throw new AnalysisException("get tables from plan 
failed. meet unknown type node " + plan);
+                    }
+                }
             }
         });
-        return unboundRelations;
+        return tableNames;
+    }
+
+    private Set<List<String>> extractTableNamesFromHaving(LogicalHaving<?> 
having) {
+        Set<SubqueryExpr> subqueryExprs = having.getPredicate()
+                .collect(SubqueryExpr.class::isInstance);
+        Set<List<String>> tableNames = new HashSet<>();
+        for (SubqueryExpr expr : subqueryExprs) {
+            LogicalPlan plan = expr.getQueryPlan();
+            tableNames.addAll(getTables(plan));
+        }
+        return tableNames;
+    }
+
+    private Set<List<String>> 
extractTableNamesFromOneRowRelation(UnboundOneRowRelation oneRowRelation) {
+        Set<SubqueryExpr> subqueryExprs = oneRowRelation.getProjects().stream()
+                .<Set<SubqueryExpr>>map(p -> 
p.collect(SubqueryExpr.class::isInstance))
+                .flatMap(Set::stream)
+                .collect(Collectors.toSet());
+        Set<List<String>> tableNames = new HashSet<>();
+        for (SubqueryExpr expr : subqueryExprs) {
+            LogicalPlan plan = expr.getQueryPlan();
+            tableNames.addAll(getTables(plan));
+        }
+        return tableNames;
+    }
+
+    private Set<List<String>> extractTableNamesFromProject(LogicalProject<?> 
project) {
+        Set<SubqueryExpr> subqueryExprs = project.getProjects().stream()
+                .<Set<SubqueryExpr>>map(p -> 
p.collect(SubqueryExpr.class::isInstance))
+                .flatMap(Set::stream)
+                .collect(Collectors.toSet());
+        Set<List<String>> tableNames = new HashSet<>();
+        for (SubqueryExpr expr : subqueryExprs) {
+            LogicalPlan plan = expr.getQueryPlan();
+            tableNames.addAll(getTables(plan));
+        }
+        return tableNames;
     }
 
-    private Set<UnboundRelation> 
extractUnboundRelationFromFilter(LogicalFilter<?> filter) {
+    private Set<List<String>> extractTableNamesFromFilter(LogicalFilter<?> 
filter) {
         Set<SubqueryExpr> subqueryExprs = filter.getPredicate()
                 .collect(SubqueryExpr.class::isInstance);
-        Set<UnboundRelation> relations = new HashSet<>();
+        Set<List<String>> tableNames = new HashSet<>();
         for (SubqueryExpr expr : subqueryExprs) {
             LogicalPlan plan = expr.getQueryPlan();
-            relations.addAll(getTables(plan));
+            tableNames.addAll(getTables(plan));
         }
-        return relations;
+        return tableNames;
     }
 
-    private Set<UnboundRelation> extractUnboundRelationFromCTE(LogicalCTE<?> 
cte) {
+    private Set<List<String>> extractTableNamesFromCTE(LogicalCTE<?> cte) {
         List<LogicalSubQueryAlias<Plan>> subQueryAliases = 
cte.getAliasQueries();
-        Set<UnboundRelation> relations = new HashSet<>();
+        Set<List<String>> tableNames = new HashSet<>();
         for (LogicalSubQueryAlias<Plan> subQueryAlias : subQueryAliases) {
-            relations.addAll(getTables(subQueryAlias));
+            tableNames.addAll(getTables(subQueryAlias));
         }
-        return relations;
+        return tableNames;
     }
 
-    private TableIf getTable(UnboundRelation unboundRelation) {
-        List<String> nameParts = unboundRelation.getNameParts();
+    private TableIf getTable(List<String> nameParts) {
         switch (nameParts.size()) {
             case 1: { // table
                 String ctlName = 
getConnectContext().getEnv().getCurrentCatalog().getName();
@@ -413,7 +478,7 @@ public class CascadesContext implements ScheduleContext {
                 return getTable(nameParts.get(0), nameParts.get(1), 
nameParts.get(2), getConnectContext().getEnv());
             }
             default:
-                throw new IllegalStateException("Table name [" + 
unboundRelation.getTableName() + "] is invalid.");
+                throw new IllegalStateException("Table name [" + 
String.join(".", nameParts) + "] is invalid.");
         }
     }
 
@@ -455,10 +520,10 @@ public class CascadesContext implements ScheduleContext {
         public Lock(LogicalPlan plan, CascadesContext cascadesContext) {
             this.cascadesContext = cascadesContext;
             // tables can also be load from dump file
-            if (cascadesContext.getTables() == null) {
+            if (cascadesContext.tables == null) {
                 cascadesContext.extractTables(plan);
             }
-            for (TableIf table : cascadesContext.tables) {
+            for (TableIf table : cascadesContext.tables.values()) {
                 if (!table.tryReadLock(1, TimeUnit.MINUTES)) {
                     close();
                     throw new RuntimeException(String.format("Failed to get 
read lock on table: %s", table.getName()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index f97057e105d..689763b6f79 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -140,7 +140,7 @@ public class BindRelation extends OneAnalysisRuleFactory {
         List<String> tableQualifier = 
RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
                 unboundRelation.getNameParts());
         TableIf table = null;
-        if (cascadesContext.getTables() != null) {
+        if (!CollectionUtils.isEmpty(cascadesContext.getTables())) {
             table = cascadesContext.getTableInMinidumpCache(tableName);
         }
         if (table == null) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
index 0db39e9e11d..58212c2d3ba 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
@@ -17,14 +17,15 @@
 
 package org.apache.doris.nereids.util;
 
-import org.apache.doris.catalog.Table;
-import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -47,10 +48,10 @@ public class ReadLockTest extends SSBTestBase {
                 PhysicalProperties.ANY
         );
         CascadesContext cascadesContext = planner.getCascadesContext();
-        List<Table> f = (List<Table>) 
Deencapsulation.getField(cascadesContext, "tables");
-        Assertions.assertEquals(1, f.size());
-        Assertions.assertEquals("supplier", 
f.stream().map(Table::getName).findFirst().get());
 
+        List<TableIf> f = cascadesContext.getTables();
+        Assertions.assertEquals(1, f.size());
+        Assertions.assertEquals("supplier", 
f.stream().map(TableIf::getName).findFirst().get());
     }
 
     @Test
@@ -69,9 +70,9 @@ public class ReadLockTest extends SSBTestBase {
                 PhysicalProperties.ANY
         );
         CascadesContext cascadesContext = planner.getCascadesContext();
-        List<Table> f = (List<Table>) 
Deencapsulation.getField(cascadesContext, "tables");
+        List<TableIf> f = cascadesContext.getTables();
         Assertions.assertEquals(1, f.size());
-        Assertions.assertEquals("supplier", 
f.stream().map(Table::getName).findFirst().get());
+        Assertions.assertEquals("supplier", 
f.stream().map(TableIf::getName).findFirst().get());
     }
 
     @Test
@@ -84,10 +85,9 @@ public class ReadLockTest extends SSBTestBase {
                 PhysicalProperties.ANY
         );
         CascadesContext cascadesContext = planner.getCascadesContext();
-        List<Table> f = (List<Table>) 
Deencapsulation.getField(cascadesContext, "tables");
-
+        List<TableIf> f = cascadesContext.getTables();
         Assertions.assertEquals(1, f.size());
-        Assertions.assertEquals("supplier", 
f.stream().map(Table::getName).findFirst().get());
+        Assertions.assertEquals("supplier", 
f.stream().map(TableIf::getName).findFirst().get());
     }
 
     @Test
@@ -100,11 +100,28 @@ public class ReadLockTest extends SSBTestBase {
                 PhysicalProperties.ANY
         );
         CascadesContext cascadesContext = planner.getCascadesContext();
-        List<Table> f = (List<Table>) 
Deencapsulation.getField(cascadesContext, "tables");
+        List<TableIf> f = cascadesContext.getTables();
         Assertions.assertEquals(2, f.size());
-        Set<String> tableNames = 
f.stream().map(Table::getName).collect(Collectors.toSet());
+        Set<String> tableNames = 
f.stream().map(TableIf::getName).collect(Collectors.toSet());
         Assertions.assertTrue(tableNames.contains("supplier"));
         Assertions.assertTrue(tableNames.contains("lineorder"));
+    }
 
+    @Test
+    public void testInserInto() {
+        String sql = "INSERT INTO supplier(s_suppkey) SELECT lo_orderkey FROM 
lineorder";
+        StatementContext statementContext = 
MemoTestUtils.createStatementContext(connectContext, sql);
+        InsertIntoTableCommand insertIntoTableCommand = 
(InsertIntoTableCommand) parser.parseSingle(sql);
+        NereidsPlanner planner = new NereidsPlanner(statementContext);
+        planner.plan(
+                (LogicalPlan) 
insertIntoTableCommand.getExplainPlan(connectContext),
+                PhysicalProperties.ANY
+        );
+        CascadesContext cascadesContext = planner.getCascadesContext();
+        List<TableIf> f = cascadesContext.getTables();
+        Assertions.assertEquals(2, f.size());
+        Set<String> tableNames = 
f.stream().map(TableIf::getName).collect(Collectors.toSet());
+        Assertions.assertTrue(tableNames.contains("supplier"));
+        Assertions.assertTrue(tableNames.contains("lineorder"));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to