This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3ffc6f08354 [fix](Nereids) should do read lock on table being insert when analyze (#25619) 3ffc6f08354 is described below commit 3ffc6f08354ca10a796d880aa762dee7a2f3be00 Author: morrySnow <101034200+morrys...@users.noreply.github.com> AuthorDate: Fri Oct 20 21:09:19 2023 +0800 [fix](Nereids) should do read lock on table being insert when analyze (#25619) --- .../org/apache/doris/nereids/CascadesContext.java | 121 ++++++++++++++++----- .../doris/nereids/rules/analysis/BindRelation.java | 2 +- .../apache/doris/nereids/util/ReadLockTest.java | 41 +++++-- 3 files changed, 123 insertions(+), 41 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index 4cd248cb8a3..19d38873f9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -23,6 +23,8 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.nereids.analyzer.Scope; +import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; +import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.jobs.Job; @@ -52,7 +54,9 @@ import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalHaving; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; @@ -61,6 +65,8 @@ import org.apache.doris.statistics.Statistics; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import org.apache.hadoop.util.Lists; import java.util.ArrayList; import java.util.HashMap; @@ -74,6 +80,7 @@ import java.util.Set; import java.util.Stack; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -96,7 +103,7 @@ public class CascadesContext implements ScheduleContext { private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed; private final RuntimeFilterContext runtimeFilterContext; private Optional<Scope> outerScope = Optional.empty(); - private List<TableIf> tables = null; + private Map<Long, TableIf> tables = null; private boolean isRewriteRoot; private volatile boolean isTimeout = false; @@ -213,7 +220,7 @@ public class CascadesContext implements ScheduleContext { } public void setTables(List<TableIf> tables) { - this.tables = tables; + this.tables = tables.stream().collect(Collectors.toMap(TableIf::getId, t -> t, (t1, t2) -> t1)); } public ConnectContext getConnectContext() { @@ -330,21 +337,23 @@ public class CascadesContext implements ScheduleContext { * Extract tables. */ public void extractTables(LogicalPlan logicalPlan) { - Set<UnboundRelation> relations = getTables(logicalPlan); - tables = new ArrayList<>(); - for (UnboundRelation r : relations) { + Set<List<String>> tableNames = getTables(logicalPlan); + tables = Maps.newHashMap(); + for (List<String> tableName : tableNames) { try { - tables.add(getTable(r)); + TableIf table = getTable(tableName); + tables.put(table.getId(), table); } catch (Throwable e) { // IGNORE } } + } /** get table by table name, try to get from information from dumpfile first */ public TableIf getTableInMinidumpCache(String tableName) { - Preconditions.checkState(tables != null); - for (TableIf table : tables) { + Preconditions.checkState(tables != null, "tables should not be null"); + for (TableIf table : tables.values()) { if (table.getName().equals(tableName)) { return table; } @@ -356,45 +365,101 @@ public class CascadesContext implements ScheduleContext { } public List<TableIf> getTables() { - return tables; + if (tables == null) { + return null; + } else { + return Lists.newArrayList(tables.values()); + } } - private Set<UnboundRelation> getTables(LogicalPlan logicalPlan) { - Set<UnboundRelation> unboundRelations = new HashSet<>(); + private Set<List<String>> getTables(LogicalPlan logicalPlan) { + final Set<List<String>> tableNames = new HashSet<>(); logicalPlan.foreach(p -> { if (p instanceof LogicalFilter) { - unboundRelations.addAll(extractUnboundRelationFromFilter((LogicalFilter<?>) p)); + tableNames.addAll(extractTableNamesFromFilter((LogicalFilter<?>) p)); } else if (p instanceof LogicalCTE) { - unboundRelations.addAll(extractUnboundRelationFromCTE((LogicalCTE<?>) p)); + tableNames.addAll(extractTableNamesFromCTE((LogicalCTE<?>) p)); + } else if (p instanceof LogicalProject) { + tableNames.addAll(extractTableNamesFromProject((LogicalProject<?>) p)); + } else if (p instanceof LogicalHaving) { + tableNames.addAll(extractTableNamesFromHaving((LogicalHaving<?>) p)); + } else if (p instanceof UnboundOneRowRelation) { + tableNames.addAll(extractTableNamesFromOneRowRelation((UnboundOneRowRelation) p)); } else { - unboundRelations.addAll(p.collect(UnboundRelation.class::isInstance)); + Set<LogicalPlan> logicalPlans = p.collect( + n -> (n instanceof UnboundRelation || n instanceof UnboundOlapTableSink)); + for (LogicalPlan plan : logicalPlans) { + if (plan instanceof UnboundRelation) { + tableNames.add(((UnboundRelation) plan).getNameParts()); + } else if (plan instanceof UnboundOlapTableSink) { + tableNames.add(((UnboundOlapTableSink<?>) plan).getNameParts()); + } else { + throw new AnalysisException("get tables from plan failed. meet unknown type node " + plan); + } + } } }); - return unboundRelations; + return tableNames; + } + + private Set<List<String>> extractTableNamesFromHaving(LogicalHaving<?> having) { + Set<SubqueryExpr> subqueryExprs = having.getPredicate() + .collect(SubqueryExpr.class::isInstance); + Set<List<String>> tableNames = new HashSet<>(); + for (SubqueryExpr expr : subqueryExprs) { + LogicalPlan plan = expr.getQueryPlan(); + tableNames.addAll(getTables(plan)); + } + return tableNames; + } + + private Set<List<String>> extractTableNamesFromOneRowRelation(UnboundOneRowRelation oneRowRelation) { + Set<SubqueryExpr> subqueryExprs = oneRowRelation.getProjects().stream() + .<Set<SubqueryExpr>>map(p -> p.collect(SubqueryExpr.class::isInstance)) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + Set<List<String>> tableNames = new HashSet<>(); + for (SubqueryExpr expr : subqueryExprs) { + LogicalPlan plan = expr.getQueryPlan(); + tableNames.addAll(getTables(plan)); + } + return tableNames; + } + + private Set<List<String>> extractTableNamesFromProject(LogicalProject<?> project) { + Set<SubqueryExpr> subqueryExprs = project.getProjects().stream() + .<Set<SubqueryExpr>>map(p -> p.collect(SubqueryExpr.class::isInstance)) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + Set<List<String>> tableNames = new HashSet<>(); + for (SubqueryExpr expr : subqueryExprs) { + LogicalPlan plan = expr.getQueryPlan(); + tableNames.addAll(getTables(plan)); + } + return tableNames; } - private Set<UnboundRelation> extractUnboundRelationFromFilter(LogicalFilter<?> filter) { + private Set<List<String>> extractTableNamesFromFilter(LogicalFilter<?> filter) { Set<SubqueryExpr> subqueryExprs = filter.getPredicate() .collect(SubqueryExpr.class::isInstance); - Set<UnboundRelation> relations = new HashSet<>(); + Set<List<String>> tableNames = new HashSet<>(); for (SubqueryExpr expr : subqueryExprs) { LogicalPlan plan = expr.getQueryPlan(); - relations.addAll(getTables(plan)); + tableNames.addAll(getTables(plan)); } - return relations; + return tableNames; } - private Set<UnboundRelation> extractUnboundRelationFromCTE(LogicalCTE<?> cte) { + private Set<List<String>> extractTableNamesFromCTE(LogicalCTE<?> cte) { List<LogicalSubQueryAlias<Plan>> subQueryAliases = cte.getAliasQueries(); - Set<UnboundRelation> relations = new HashSet<>(); + Set<List<String>> tableNames = new HashSet<>(); for (LogicalSubQueryAlias<Plan> subQueryAlias : subQueryAliases) { - relations.addAll(getTables(subQueryAlias)); + tableNames.addAll(getTables(subQueryAlias)); } - return relations; + return tableNames; } - private TableIf getTable(UnboundRelation unboundRelation) { - List<String> nameParts = unboundRelation.getNameParts(); + private TableIf getTable(List<String> nameParts) { switch (nameParts.size()) { case 1: { // table String ctlName = getConnectContext().getEnv().getCurrentCatalog().getName(); @@ -413,7 +478,7 @@ public class CascadesContext implements ScheduleContext { return getTable(nameParts.get(0), nameParts.get(1), nameParts.get(2), getConnectContext().getEnv()); } default: - throw new IllegalStateException("Table name [" + unboundRelation.getTableName() + "] is invalid."); + throw new IllegalStateException("Table name [" + String.join(".", nameParts) + "] is invalid."); } } @@ -455,10 +520,10 @@ public class CascadesContext implements ScheduleContext { public Lock(LogicalPlan plan, CascadesContext cascadesContext) { this.cascadesContext = cascadesContext; // tables can also be load from dump file - if (cascadesContext.getTables() == null) { + if (cascadesContext.tables == null) { cascadesContext.extractTables(plan); } - for (TableIf table : cascadesContext.tables) { + for (TableIf table : cascadesContext.tables.values()) { if (!table.tryReadLock(1, TimeUnit.MINUTES)) { close(); throw new RuntimeException(String.format("Failed to get read lock on table: %s", table.getName())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index f97057e105d..689763b6f79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -140,7 +140,7 @@ public class BindRelation extends OneAnalysisRuleFactory { List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), unboundRelation.getNameParts()); TableIf table = null; - if (cascadesContext.getTables() != null) { + if (!CollectionUtils.isEmpty(cascadesContext.getTables())) { table = cascadesContext.getTableInMinidumpCache(tableName); } if (table == null) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java index 0db39e9e11d..58212c2d3ba 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java @@ -17,14 +17,15 @@ package org.apache.doris.nereids.util; -import org.apache.doris.catalog.Table; -import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.catalog.TableIf; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.datasets.ssb.SSBTestBase; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -47,10 +48,10 @@ public class ReadLockTest extends SSBTestBase { PhysicalProperties.ANY ); CascadesContext cascadesContext = planner.getCascadesContext(); - List<Table> f = (List<Table>) Deencapsulation.getField(cascadesContext, "tables"); - Assertions.assertEquals(1, f.size()); - Assertions.assertEquals("supplier", f.stream().map(Table::getName).findFirst().get()); + List<TableIf> f = cascadesContext.getTables(); + Assertions.assertEquals(1, f.size()); + Assertions.assertEquals("supplier", f.stream().map(TableIf::getName).findFirst().get()); } @Test @@ -69,9 +70,9 @@ public class ReadLockTest extends SSBTestBase { PhysicalProperties.ANY ); CascadesContext cascadesContext = planner.getCascadesContext(); - List<Table> f = (List<Table>) Deencapsulation.getField(cascadesContext, "tables"); + List<TableIf> f = cascadesContext.getTables(); Assertions.assertEquals(1, f.size()); - Assertions.assertEquals("supplier", f.stream().map(Table::getName).findFirst().get()); + Assertions.assertEquals("supplier", f.stream().map(TableIf::getName).findFirst().get()); } @Test @@ -84,10 +85,9 @@ public class ReadLockTest extends SSBTestBase { PhysicalProperties.ANY ); CascadesContext cascadesContext = planner.getCascadesContext(); - List<Table> f = (List<Table>) Deencapsulation.getField(cascadesContext, "tables"); - + List<TableIf> f = cascadesContext.getTables(); Assertions.assertEquals(1, f.size()); - Assertions.assertEquals("supplier", f.stream().map(Table::getName).findFirst().get()); + Assertions.assertEquals("supplier", f.stream().map(TableIf::getName).findFirst().get()); } @Test @@ -100,11 +100,28 @@ public class ReadLockTest extends SSBTestBase { PhysicalProperties.ANY ); CascadesContext cascadesContext = planner.getCascadesContext(); - List<Table> f = (List<Table>) Deencapsulation.getField(cascadesContext, "tables"); + List<TableIf> f = cascadesContext.getTables(); Assertions.assertEquals(2, f.size()); - Set<String> tableNames = f.stream().map(Table::getName).collect(Collectors.toSet()); + Set<String> tableNames = f.stream().map(TableIf::getName).collect(Collectors.toSet()); Assertions.assertTrue(tableNames.contains("supplier")); Assertions.assertTrue(tableNames.contains("lineorder")); + } + @Test + public void testInserInto() { + String sql = "INSERT INTO supplier(s_suppkey) SELECT lo_orderkey FROM lineorder"; + StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql); + InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) parser.parseSingle(sql); + NereidsPlanner planner = new NereidsPlanner(statementContext); + planner.plan( + (LogicalPlan) insertIntoTableCommand.getExplainPlan(connectContext), + PhysicalProperties.ANY + ); + CascadesContext cascadesContext = planner.getCascadesContext(); + List<TableIf> f = cascadesContext.getTables(); + Assertions.assertEquals(2, f.size()); + Set<String> tableNames = f.stream().map(TableIf::getName).collect(Collectors.toSet()); + Assertions.assertTrue(tableNames.contains("supplier")); + Assertions.assertTrue(tableNames.contains("lineorder")); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org