This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 4732aae628f [Refactor](insert) refactor insert command to support other type of table (#31610) (#32345) 4732aae628f is described below commit 4732aae628fb9f9ebf60e63830bee512eb40cd2c Author: Mingyu Chen <morning...@163.com> AuthorDate: Sun Mar 17 20:46:07 2024 +0800 [Refactor](insert) refactor insert command to support other type of table (#31610) (#32345) bp #31610 --- .../main/java/org/apache/doris/common/Config.java | 8 - .../java/org/apache/doris/catalog/TableIf.java | 54 +++- .../org/apache/doris/datasource/ExternalTable.java | 104 ------- .../doris/job/extensions/insert/InsertJob.java | 2 +- .../doris/job/extensions/insert/InsertTask.java | 2 +- .../doris/nereids/parser/LogicalPlanBuilder.java | 8 +- .../trees/plans/commands/CreateTableCommand.java | 3 +- .../plans/commands/DeleteFromUsingCommand.java | 4 +- .../plans/commands/InsertIntoTableCommand.java | 291 ------------------ .../nereids/trees/plans/commands/LoadCommand.java | 3 +- .../trees/plans/commands/UpdateCommand.java | 4 +- .../plans/commands/UpdateMvByPartitionCommand.java | 1 + .../commands/insert/AbstractInsertExecutor.java | 180 ++++++++++++ .../{ => insert}/BatchInsertIntoTableCommand.java | 22 +- .../plans/commands/insert/GroupCommitInserter.java | 142 +++++++++ .../commands/insert/InsertCommandContext.java | 26 ++ .../commands/insert/InsertIntoTableCommand.java | 190 ++++++++++++ .../{ => insert}/InsertOverwriteTableCommand.java | 25 +- .../InsertUtils.java} | 327 +-------------------- .../commands/insert/OlapInsertCommandContext.java | 33 +++ .../plans/commands/insert/OlapInsertExecutor.java | 257 ++++++++++++++++ .../trees/plans/visitor/CommandVisitor.java | 6 +- .../java/org/apache/doris/qe/StmtExecutor.java | 26 +- .../apache/doris/nereids/util/ReadLockTest.java | 2 +- .../org/apache/doris/qe/HmsQueryCacheTest.java | 6 - 25 files changed, 930 insertions(+), 796 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 2c5a36e9f85..d4b83ccb51e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1167,14 +1167,6 @@ public class Config extends ConfigBase { @ConfField public static String small_file_dir = System.getenv("DORIS_HOME") + "/small_files"; - /** - * If set to true, the insert stmt with processing error will still return a label to user. - * And user can use this label to check the load job's status. - * The default value is false, which means if insert operation encounter errors, - * exception will be thrown to user client directly without load label. - */ - @ConfField(mutable = true, masterOnly = true) public static boolean using_old_load_usage_pattern = false; - /** * This will limit the max recursion depth of hash distribution pruner. * eg: where a in (5 elements) and b in (4 elements) and c in (3 elements) and d in (2 elements). diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index fd7f5d53880..1ef0e9c2f3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -55,35 +55,59 @@ import java.util.stream.Collectors; public interface TableIf { Logger LOG = LogManager.getLogger(TableIf.class); - void readLock(); + default void readLock() { + } + + default boolean tryReadLock(long timeout, TimeUnit unit) { + return true; + } - boolean tryReadLock(long timeout, TimeUnit unit); + default void readUnlock() { + } - void readUnlock(); + ; - void writeLock(); + default void writeLock() { + } - boolean writeLockIfExist(); + default boolean writeLockIfExist() { + return true; + } - boolean tryWriteLock(long timeout, TimeUnit unit); + default boolean tryWriteLock(long timeout, TimeUnit unit) { + return true; + } - void writeUnlock(); + default void writeUnlock() { + } - boolean isWriteLockHeldByCurrentThread(); + default boolean isWriteLockHeldByCurrentThread() { + return true; + } - <E extends Exception> void writeLockOrException(E e) throws E; + default <E extends Exception> void writeLockOrException(E e) throws E { + } - void writeLockOrDdlException() throws DdlException; + default void writeLockOrDdlException() throws DdlException { + } - void writeLockOrMetaException() throws MetaNotFoundException; + default void writeLockOrMetaException() throws MetaNotFoundException { + } - void writeLockOrAlterCancelException() throws AlterCancelException; + default void writeLockOrAlterCancelException() throws AlterCancelException { + } - boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException; + default boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException { + return true; + } - <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E; + default <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E { + return true; + } - boolean tryWriteLockIfExist(long timeout, TimeUnit unit); + default boolean tryWriteLockIfExist(long timeout, TimeUnit unit) { + return true; + } long getId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index f39852fd215..7f82d0d3876 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -17,7 +17,6 @@ package org.apache.doris.datasource; -import org.apache.doris.alter.AlterCancelException; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -25,9 +24,6 @@ import org.apache.doris.catalog.TableAttributes; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.constraint.Constraint; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; @@ -55,8 +51,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -86,7 +80,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { protected long dbId; protected boolean objectCreated; protected ExternalCatalog catalog; - protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); /** * No args constructor for persist. @@ -132,102 +125,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { } } - @Override - public void readLock() { - this.rwLock.readLock().lock(); - } - - @Override - public boolean tryReadLock(long timeout, TimeUnit unit) { - try { - return this.rwLock.readLock().tryLock(timeout, unit); - } catch (InterruptedException e) { - LOG.warn("failed to try read lock at table[" + name + "]", e); - return false; - } - } - - @Override - public void readUnlock() { - this.rwLock.readLock().unlock(); - } - - @Override - public void writeLock() { - this.rwLock.writeLock().lock(); - } - - @Override - public boolean writeLockIfExist() { - writeLock(); - return true; - } - - @Override - public boolean tryWriteLock(long timeout, TimeUnit unit) { - try { - return this.rwLock.writeLock().tryLock(timeout, unit); - } catch (InterruptedException e) { - LOG.warn("failed to try write lock at table[" + name + "]", e); - return false; - } - } - - @Override - public void writeUnlock() { - this.rwLock.writeLock().unlock(); - } - - @Override - public boolean isWriteLockHeldByCurrentThread() { - return this.rwLock.writeLock().isHeldByCurrentThread(); - } - - @Override - public <E extends Exception> void writeLockOrException(E e) throws E { - writeLock(); - } - - @Override - public void writeLockOrDdlException() throws DdlException { - writeLockOrException(new DdlException("unknown table, tableName=" + name, - ErrorCode.ERR_BAD_TABLE_ERROR)); - } - - @Override - public void writeLockOrMetaException() throws MetaNotFoundException { - writeLockOrException(new MetaNotFoundException("unknown table, tableName=" + name, - ErrorCode.ERR_BAD_TABLE_ERROR)); - } - - @Override - public void writeLockOrAlterCancelException() throws AlterCancelException { - writeLockOrException(new AlterCancelException("unknown table, tableName=" + name, - ErrorCode.ERR_BAD_TABLE_ERROR)); - } - - @Override - public boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException { - return tryWriteLockOrException(timeout, unit, new MetaNotFoundException("unknown table, tableName=" + name, - ErrorCode.ERR_BAD_TABLE_ERROR)); - } - - @Override - public <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E { - if (tryWriteLock(timeout, unit)) { - return true; - } - return false; - } - - @Override - public boolean tryWriteLockIfExist(long timeout, TimeUnit unit) { - if (tryWriteLock(timeout, unit)) { - return true; - } - return false; - } - @Override public long getId() { return id; @@ -417,7 +314,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { @Override public void gsonPostProcess() throws IOException { - rwLock = new ReentrantReadWriteLock(true); objectCreated = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java index 15e0c37987f..a5851892ec1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java @@ -47,7 +47,7 @@ import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.mysql.privilege.Privilege; -import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 781eb92a714..8fe786555ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -28,7 +28,7 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadStatistic; import org.apache.doris.nereids.parser.NereidsParser; -import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.thrift.TCell; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 37292b25588..a5f16fc87ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -351,7 +351,6 @@ import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; -import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.CallCommand; import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; @@ -368,8 +367,6 @@ import org.apache.doris.nereids.trees.plans.commands.DropProcedureCommand; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.commands.ExportCommand; -import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; -import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.commands.LoadCommand; import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand; @@ -406,6 +403,9 @@ import org.apache.doris.nereids.trees.plans.commands.info.RollupDefinition; import org.apache.doris.nereids.trees.plans.commands.info.SimpleColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.StepPartition; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; +import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalExcept; @@ -542,7 +542,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { if (ConnectContext.get() != null && ConnectContext.get().isTxnModel()) { command = new BatchInsertIntoTableCommand(sink); } else { - command = new InsertIntoTableCommand(sink, labelName); + command = new InsertIntoTableCommand(sink, labelName, Optional.empty()); } } if (ctx.explain() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java index 7606097411b..10dfdd2c2b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java @@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.types.CharType; @@ -155,7 +156,7 @@ public class CreateTableCommand extends Command implements ForwardWithSync { query = new UnboundTableSink<>(createTableInfo.getTableNameParts(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), query); try { - new InsertIntoTableCommand(query, Optional.empty()).run(ctx, executor); + new InsertIntoTableCommand(query, Optional.empty(), Optional.empty()).run(ctx, executor); if (ctx.getState().getStateType() == MysqlStateType.ERR) { handleFallbackFailedCtas(ctx); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java index 6fe57d6361d..3791b47f140 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -76,7 +77,8 @@ public class DeleteFromUsingCommand extends Command implements ForwardWithSync, + " Please check the following session variables: " + String.join(", ", SessionVariable.DEBUG_VARIABLES)); } - new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty()).run(ctx, executor); + new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx, + executor); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java deleted file mode 100644 index 166219dfae1..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ /dev/null @@ -1,291 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.plans.commands; - -import org.apache.doris.analysis.Expr; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.ProfileManager.ProfileType; -import org.apache.doris.load.loadv2.LoadStatistic; -import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.nereids.NereidsPlanner; -import org.apache.doris.nereids.exceptions.AnalysisException; -import org.apache.doris.nereids.glue.LogicalPlanAdapter; -import org.apache.doris.nereids.trees.expressions.Slot; -import org.apache.doris.nereids.trees.plans.Explainable; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; -import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; -import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; -import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; -import org.apache.doris.planner.DataSink; -import org.apache.doris.planner.GroupCommitPlanner; -import org.apache.doris.planner.OlapTableSink; -import org.apache.doris.planner.UnionNode; -import org.apache.doris.proto.InternalService; -import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.SqlModeHelper; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.rpc.RpcException; -import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.transaction.TransactionStatus; - -import com.google.common.base.Preconditions; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ExecutionException; - -/** - * insert into select command implementation - * insert into select command support the grammar: explain? insert into table columns? partitions? hints? query - * InsertIntoTableCommand is a command to represent insert the answer of a query into a table. - * class structure's: - * InsertIntoTableCommand(Query()) - * ExplainCommand(Query()) - */ -public class InsertIntoTableCommand extends Command implements ForwardWithSync, Explainable { - - public static final Logger LOG = LogManager.getLogger(InsertIntoTableCommand.class); - - private LogicalPlan logicalQuery; - private Optional<String> labelName; - /** - * When source it's from job scheduler,it will be set. - */ - private long jobId; - private boolean allowAutoPartition; - - /** - * constructor - */ - public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName) { - super(PlanType.INSERT_INTO_TABLE_COMMAND); - this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null"); - this.labelName = Objects.requireNonNull(labelName, "labelName should not be null"); - // only insert overwrite will disable it. - this.allowAutoPartition = true; - } - - public Optional<String> getLabelName() { - return labelName; - } - - public void setLabelName(Optional<String> labelName) { - this.labelName = labelName; - } - - public void setJobId(long jobId) { - this.jobId = jobId; - } - - public void setAllowAutoPartition(boolean allowAutoPartition) { - this.allowAutoPartition = allowAutoPartition; - } - - @Override - public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { - runInternal(ctx, executor); - } - - public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor, - LoadStatistic loadStatistic) throws Exception { - // TODO: add coordinator statistic - runInternal(ctx, executor); - } - - private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception { - if (!ctx.getSessionVariable().isEnableNereidsDML()) { - try { - ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce(); - } catch (Exception e) { - throw new AnalysisException("failed to set fallback to original planner to true", e); - } - throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner"); - } - - PhysicalOlapTableSink<?> physicalOlapTableSink; - DataSink sink; - InsertExecutor insertExecutor; - Table targetTable; - TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx); - // should lock target table until we begin transaction. - targetTableIf.readLock(); - try { - // 1. process inline table (default values, empty values) - this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf); - - LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); - NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); - planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift()); - executor.setPlanner(planner); - executor.checkBlockRules(); - if (ctx.getMysqlChannel() != null) { - ctx.getMysqlChannel().reset(); - } - - // TODO: support other type table insert into - Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan() - .<Set<PhysicalOlapTableSink<?>>>collect(PhysicalOlapTableSink.class::isInstance)).stream() - .findAny(); - Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode"); - physicalOlapTableSink = plan.get(); - - targetTable = physicalOlapTableSink.getTargetTable(); - // check auth - if (!Env.getCurrentEnv().getAccessManager() - .checkTblPriv(ConnectContext.get(), targetTable.getQualifiedDbName(), targetTable.getName(), - PrivPredicate.LOAD)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), - targetTable.getQualifiedDbName() + ": " + targetTable.getName()); - } - sink = planner.getFragments().get(0).getSink(); - // group commit - if (analyzeGroupCommit(ctx, sink, physicalOlapTableSink)) { - // handleGroupCommit(ctx, sink, physicalOlapTableSink); - // return; - throw new AnalysisException("group commit is not supported in nereids now"); - } - - String label = this.labelName.orElse(String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo)); - insertExecutor = new InsertExecutor(ctx, - physicalOlapTableSink.getDatabase(), - physicalOlapTableSink.getTargetTable(), label, planner); - insertExecutor.beginTransaction(); - insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalOlapTableSink.isPartialUpdate(), - physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT, this.allowAutoPartition); - } finally { - targetTableIf.readUnlock(); - } - - boolean isEnableMemtableOnSinkNode = - ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange() - ? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode() : false; - insertExecutor.getCoordinator().getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); - executor.setProfileType(ProfileType.LOAD); - // We exposed @StmtExecutor#cancel as a unified entry point for statement interruption - // so we need to set this here - executor.setCoord(insertExecutor.getCoordinator()); - insertExecutor.executeSingleInsertTransaction(executor, jobId); - } - - private void handleGroupCommit(ConnectContext ctx, DataSink sink, - PhysicalOlapTableSink<?> physicalOlapTableSink) - throws UserException, RpcException, TException, ExecutionException, InterruptedException { - // TODO we should refactor this to remove rely on UnionNode - List<InternalService.PDataRow> rows = new ArrayList<>(); - List<List<Expr>> materializedConstExprLists = ((UnionNode) sink.getFragment() - .getPlanRoot()).getMaterializedConstExprLists(); - int filterSize = 0; - for (Slot slot : physicalOlapTableSink.getOutput()) { - if (slot.getName().contains(Column.DELETE_SIGN) - || slot.getName().contains(Column.VERSION_COL)) { - filterSize += 1; - } - } - for (List<Expr> list : materializedConstExprLists) { - rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize)); - } - GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(), - physicalOlapTableSink.getTargetTable(), null, ctx.queryId(), - ConnectContext.get().getSessionVariable().getGroupCommit()); - PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); - TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); - if (code == TStatusCode.DATA_QUALITY_ERROR) { - LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, " - + "schema version: {}", ctx.queryId(), - groupCommitPlanner.getBackend(), response.getStatus(), - physicalOlapTableSink.getTargetTable().getBaseSchemaVersion()); - } else if (code != TStatusCode.OK) { - String errMsg = "group commit insert failed. backend id: " - + groupCommitPlanner.getBackend().getId() + ", status: " - + response.getStatus(); - ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } - TransactionStatus txnStatus = TransactionStatus.PREPARE; - String sb = "{'label':'" + response.getLabel() + "', 'status':'" + txnStatus.name() - + "', 'txnId':'" + response.getTxnId() + "'" - + "', 'optimizer':'" + "nereids" + "'" - + "}"; - ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb); - ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(), - physicalOlapTableSink.getDatabase().getFullName(), physicalOlapTableSink.getTargetTable().getName(), - txnStatus, response.getLoadedRows(), (int) response.getFilteredRows()); - // update it, so that user can get loaded rows in fe.audit.log - ctx.updateReturnRows((int) response.getLoadedRows()); - } - - private boolean analyzeGroupCommit(ConnectContext ctx, DataSink sink, - PhysicalOlapTableSink<?> physicalOlapTableSink) { - if (!(sink instanceof OlapTableSink) || !ctx.getSessionVariable().isEnableInsertGroupCommit() - || ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) { - return false; - } - OlapTable targetTable = physicalOlapTableSink.getTargetTable(); - return ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES - && !ctx.isTxnModel() && isGroupCommitAvailablePlan(physicalOlapTableSink) - && physicalOlapTableSink.getPartitionIds().isEmpty() && targetTable.getTableProperty() - .getUseSchemaLightChange() && !targetTable.getQualifiedDbName() - .equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME); - } - - private boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extends Plan> sink) { - Plan child = sink.child(); - if (child instanceof PhysicalDistribute) { - child = child.child(0); - } - return child instanceof OneRowRelation || (child instanceof PhysicalUnion && child.arity() == 0); - } - - @Override - public Plan getExplainPlan(ConnectContext ctx) { - if (!ctx.getSessionVariable().isEnableNereidsDML()) { - try { - ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce(); - } catch (Exception e) { - throw new AnalysisException("failed to set fallback to original planner to true", e); - } - throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner"); - } - return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx)); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitInsertIntoTableCommand(this, context); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index ce61a3bbb3f..01f2d0f8e85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -49,6 +49,7 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -130,7 +131,7 @@ public class LoadCommand extends Command implements ForwardWithSync { profile.getSummaryProfile().setQueryBeginTime(); if (sourceInfos.size() == 1) { plans = ImmutableList.of(new InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)), - Optional.of(labelName))); + Optional.of(labelName), Optional.empty())); } else { throw new AnalysisException("Multi insert into statements are unsupported."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index cde07c957a3..29d3b8d96f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -35,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -93,7 +94,8 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { - new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty()).run(ctx, executor); + new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx, + executor); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index f6bd39ac838..8b714b64edf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java new file mode 100644 index 00000000000..1a9ce2d744d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.Coordinator; +import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.task.LoadEtlTask; +import org.apache.doris.thrift.TQueryType; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Abstract insert executor. + * The derived class should implement the abstract method for certain type of target table + */ +public abstract class AbstractInsertExecutor { + private static final Logger LOG = LogManager.getLogger(AbstractInsertExecutor.class); + protected long jobId; + protected final ConnectContext ctx; + protected final Coordinator coordinator; + protected final String labelName; + protected final DatabaseIf database; + protected final TableIf table; + protected final long createTime = System.currentTimeMillis(); + protected long loadedRows = 0; + protected int filteredRows = 0; + protected String errMsg = ""; + protected Optional<InsertCommandContext> insertCtx; + + /** + * Constructor + */ + public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner, + Optional<InsertCommandContext> insertCtx) { + this.ctx = ctx; + this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator()); + this.labelName = labelName; + this.table = table; + this.database = table.getDatabase(); + this.insertCtx = insertCtx; + } + + public Coordinator getCoordinator() { + return coordinator; + } + + /** + * begin transaction if necessary + */ + public abstract void beginTransaction(); + + /** + * finalize sink to complete enough info for sink execution + */ + protected abstract void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink); + + /** + * Do something before exec + */ + protected abstract void beforeExec(); + + /** + * Do something after exec finished + */ + protected abstract void onComplete() throws UserException; + + /** + * Do something when exec throw exception + */ + protected abstract void onFail(Throwable t); + + /** + * Do something after exec + */ + protected abstract void afterExec(StmtExecutor executor); + + protected final void execImpl(StmtExecutor executor, long jobId) throws Exception { + String queryId = DebugUtil.printId(ctx.queryId()); + coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); + coordinator.setQueryType(TQueryType.LOAD); + executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile()); + QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator); + QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo); + coordinator.exec(); + int execTimeout = ctx.getExecTimeout(); + if (LOG.isDebugEnabled()) { + LOG.debug("insert [{}] with query id {} execution timeout is {}", labelName, queryId, execTimeout); + } + boolean notTimeout = coordinator.join(execTimeout); + if (!coordinator.isDone()) { + coordinator.cancel(); + if (notTimeout) { + errMsg = coordinator.getExecStatus().getErrorMsg(); + ErrorReport.reportDdlException("there exists unhealthy backend. " + + errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); + } + } + if (!coordinator.getExecStatus().ok()) { + errMsg = coordinator.getExecStatus().getErrorMsg(); + LOG.warn("insert [{}] with query id {} failed, {}", labelName, queryId, errMsg); + ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); + } + if (LOG.isDebugEnabled()) { + LOG.debug("insert [{}] with query id {} delta files is {}", + labelName, queryId, coordinator.getDeltaUrls()); + } + if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { + loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); + } + if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { + filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); + } + } + + private boolean checkStrictMode() { + // if in strict mode, insert will fail if there are filtered rows + if (ctx.getSessionVariable().getEnableInsertStrict()) { + if (filteredRows > 0) { + ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT, + "Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl()); + return false; + } + } + return true; + } + + /** + * execute insert txn for insert into select command. + */ + public void executeSingleInsert(StmtExecutor executor, long jobId) { + beforeExec(); + try { + execImpl(executor, jobId); + if (!checkStrictMode()) { + return; + } + onComplete(); + } catch (Throwable t) { + onFail(t); + return; + } finally { + executor.updateProfile(true); + QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId()); + } + afterExec(executor); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/BatchInsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java similarity index 87% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/BatchInsertIntoTableCommand.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java index 87ab7f73426..82b81473948 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/BatchInsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.trees.plans.commands; +package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; @@ -32,6 +32,8 @@ import org.apache.doris.nereids.trees.TreeNode; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -69,15 +71,7 @@ public class BatchInsertIntoTableCommand extends Command implements ForwardWithS @Override public Plan getExplainPlan(ConnectContext ctx) throws Exception { - if (!ctx.getSessionVariable().isEnableNereidsDML()) { - try { - ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce(); - } catch (Exception e) { - throw new AnalysisException("failed to set fallback to original planner to true", e); - } - throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner"); - } - return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx)); + return InsertUtils.getPlanForExplain(ctx, this.logicalQuery); } @Override @@ -103,10 +97,10 @@ public class BatchInsertIntoTableCommand extends Command implements ForwardWithS } PhysicalOlapTableSink<?> sink; - TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx); + TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); targetTableIf.readLock(); try { - this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf); + this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf); LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift()); @@ -147,14 +141,14 @@ public class BatchInsertIntoTableCommand extends Command implements ForwardWithS Optional<PhysicalUnion> union = planner.getPhysicalPlan() .<Set<PhysicalUnion>>collect(PhysicalUnion.class::isInstance).stream().findAny(); if (union.isPresent()) { - InsertExecutor.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(), + InsertUtils.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(), targetTable.getName(), targetSchema, union.get().getConstantExprsList()); return; } Optional<PhysicalOneRowRelation> oneRowRelation = planner.getPhysicalPlan() .<Set<PhysicalOneRowRelation>>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny(); if (oneRowRelation.isPresent()) { - InsertExecutor.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(), + InsertUtils.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(), targetTable.getName(), targetSchema, ImmutableList.of(oneRowRelation.get().getProjects())); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java new file mode 100644 index 00000000000..4926d5486dc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.GroupCommitPlanner; +import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.UnionNode; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SqlModeHelper; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.transaction.TransactionStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** + * Handle group commit + */ +public class GroupCommitInserter { + public static final Logger LOG = LogManager.getLogger(GroupCommitInserter.class); + + /** + * Handle group commit + */ + public static boolean groupCommit(ConnectContext ctx, DataSink sink, PhysicalSink physicalSink) { + PhysicalOlapTableSink<?> olapSink = (PhysicalOlapTableSink<?>) physicalSink; + // TODO: implement group commit + if (canGroupCommit(ctx, sink, olapSink)) { + // handleGroupCommit(ctx, sink, physicalOlapTableSink); + // return; + throw new AnalysisException("group commit is not supported in Nereids now"); + } + return false; + } + + private static boolean canGroupCommit(ConnectContext ctx, DataSink sink, + PhysicalOlapTableSink<?> physicalOlapTableSink) { + if (!(sink instanceof OlapTableSink) || !ctx.getSessionVariable().isEnableInsertGroupCommit() + || ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) { + return false; + } + OlapTable targetTable = physicalOlapTableSink.getTargetTable(); + return ctx.getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES + && !ctx.isTxnModel() && isGroupCommitAvailablePlan(physicalOlapTableSink) + && physicalOlapTableSink.getPartitionIds().isEmpty() && targetTable.getTableProperty() + .getUseSchemaLightChange() && !targetTable.getQualifiedDbName() + .equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME); + } + + private static boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extends Plan> sink) { + Plan child = sink.child(); + if (child instanceof PhysicalDistribute) { + child = child.child(0); + } + return child instanceof OneRowRelation || (child instanceof PhysicalUnion && child.arity() == 0); + } + + private void handleGroupCommit(ConnectContext ctx, DataSink sink, + PhysicalOlapTableSink<?> physicalOlapTableSink) + throws UserException, RpcException, TException, ExecutionException, InterruptedException { + // TODO we should refactor this to remove rely on UnionNode + List<InternalService.PDataRow> rows = new ArrayList<>(); + List<List<Expr>> materializedConstExprLists = ((UnionNode) sink.getFragment() + .getPlanRoot()).getMaterializedConstExprLists(); + int filterSize = 0; + for (Slot slot : physicalOlapTableSink.getOutput()) { + if (slot.getName().contains(Column.DELETE_SIGN) + || slot.getName().contains(Column.VERSION_COL)) { + filterSize += 1; + } + } + for (List<Expr> list : materializedConstExprLists) { + rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize)); + } + GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(), + physicalOlapTableSink.getTargetTable(), null, ctx.queryId(), + ConnectContext.get().getSessionVariable().getGroupCommit()); + PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code == TStatusCode.DATA_QUALITY_ERROR) { + LOG.info("group commit insert failed. query id: {}, backend id: {}, status: {}, " + + "schema version: {}", ctx.queryId(), + groupCommitPlanner.getBackend(), response.getStatus(), + physicalOlapTableSink.getTargetTable().getBaseSchemaVersion()); + } else if (code != TStatusCode.OK) { + String errMsg = "group commit insert failed. backend id: " + + groupCommitPlanner.getBackend().getId() + ", status: " + + response.getStatus(); + ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); + } + TransactionStatus txnStatus = TransactionStatus.PREPARE; + String sb = "{'label':'" + response.getLabel() + "', 'status':'" + txnStatus.name() + + "', 'txnId':'" + response.getTxnId() + "'" + + "', 'optimizer':'" + "nereids" + "'" + + "}"; + ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb); + ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(), + physicalOlapTableSink.getDatabase().getFullName(), physicalOlapTableSink.getTargetTable().getName(), + txnStatus, response.getLoadedRows(), (int) response.getFilteredRows()); + // update it, so that user can get loaded rows in fe.audit.log + ctx.updateReturnRows((int) response.getLoadedRows()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertCommandContext.java new file mode 100644 index 00000000000..1b4e28117bc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertCommandContext.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +/** + * The context of insert command. + * You can add some fields or methods here if you need in derived classed + */ +public abstract class InsertCommandContext { + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java new file mode 100644 index 00000000000..06cd4275682 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.util.ProfileManager.ProfileType; +import org.apache.doris.load.loadv2.LoadStatistic; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.plans.Explainable; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.planner.DataSink; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * insert into select command implementation + * insert into select command support the grammar: explain? insert into table columns? partitions? hints? query + * InsertIntoTableCommand is a command to represent insert the answer of a query into a table. + * class structure's: + * InsertIntoTableCommand(Query()) + * ExplainCommand(Query()) + */ +public class InsertIntoTableCommand extends Command implements ForwardWithSync, Explainable { + + public static final Logger LOG = LogManager.getLogger(InsertIntoTableCommand.class); + + private LogicalPlan logicalQuery; + private Optional<String> labelName; + /** + * When source it's from job scheduler,it will be set. + */ + private long jobId; + private Optional<InsertCommandContext> insertCtx; + + /** + * constructor + */ + public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName, + Optional<InsertCommandContext> insertCtx) { + super(PlanType.INSERT_INTO_TABLE_COMMAND); + this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null"); + this.labelName = Objects.requireNonNull(labelName, "labelName should not be null"); + this.insertCtx = insertCtx; + } + + public Optional<String> getLabelName() { + return labelName; + } + + public void setLabelName(Optional<String> labelName) { + this.labelName = labelName; + } + + public void setJobId(long jobId) { + this.jobId = jobId; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + runInternal(ctx, executor); + } + + public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor, + LoadStatistic loadStatistic) throws Exception { + // TODO: add coordinator statistic + runInternal(ctx, executor); + } + + private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception { + if (!ctx.getSessionVariable().isEnableNereidsDML()) { + try { + ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce(); + } catch (Exception e) { + throw new AnalysisException("failed to set fallback to original planner to true", e); + } + throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner"); + } + + TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); + // check auth + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), targetTableIf.getDatabase().getCatalog().getName(), + targetTableIf.getDatabase().getFullName(), targetTableIf.getName(), + PrivPredicate.LOAD)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", + ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), + targetTableIf.getDatabase().getFullName() + "." + targetTableIf.getName()); + } + + AbstractInsertExecutor insertExecutor; + // should lock target table until we begin transaction. + targetTableIf.readLock(); + try { + // 1. process inline table (default values, empty values) + this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf); + + LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); + NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); + planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift()); + executor.setPlanner(planner); + executor.checkBlockRules(); + if (ctx.getMysqlChannel() != null) { + ctx.getMysqlChannel().reset(); + } + + Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan() + .<Set<PhysicalOlapTableSink<?>>>collect(PhysicalSink.class::isInstance)).stream() + .findAny(); + Preconditions.checkArgument(plan.isPresent(), "insert into command must contain target table"); + PhysicalSink physicalSink = plan.get(); + DataSink sink = planner.getFragments().get(0).getSink(); + String label = this.labelName.orElse(String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo)); + + if (physicalSink instanceof PhysicalOlapTableSink) { + if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) { + return; + } + OlapTable olapTable = (OlapTable) targetTableIf; + insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx); + boolean isEnableMemtableOnSinkNode = + olapTable.getTableProperty().getUseSchemaLightChange() + ? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode() + : false; + insertExecutor.getCoordinator().getQueryOptions() + .setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); + } else { + // TODO: support other table types + throw new AnalysisException("insert into command only support olap table"); + } + + insertExecutor.beginTransaction(); + insertExecutor.finalizeSink(planner.getFragments().get(0), sink, physicalSink); + } finally { + targetTableIf.readUnlock(); + } + + executor.setProfileType(ProfileType.LOAD); + // We exposed @StmtExecutor#cancel as a unified entry point for statement interruption, + // so we need to set this here + executor.setCoord(insertExecutor.getCoordinator()); + insertExecutor.executeSingleInsert(executor, jobId); + } + + @Override + public Plan getExplainPlan(ConnectContext ctx) { + return InsertUtils.getPlanForExplain(ctx, this.logicalQuery); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitInsertIntoTableCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java similarity index 90% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index c7a4ed4f9ce..fffdf06b54e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.trees.plans.commands; +package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; @@ -34,6 +34,8 @@ import org.apache.doris.nereids.trees.TreeNode; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -91,13 +93,13 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS } throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner"); } - // insert overwrite only support - TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx); + + TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); if (!(targetTableIf instanceof OlapTable)) { throw new AnalysisException("insert into overwrite only support OLAP table." + " But current table type is " + targetTableIf.getType()); } - this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf); + this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf); LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); @@ -165,8 +167,9 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); // for overwrite situation, we disable auto create partition. - InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName); - insertCommand.setAllowAutoPartition(false); + OlapInsertCommandContext insertCtx = new OlapInsertCommandContext(); + insertCtx.setAllowAutoPartition(false); + InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx)); insertCommand.run(ctx, executor); if (ctx.getState().getStateType() == MysqlStateType.ERR) { String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); @@ -177,15 +180,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS @Override public Plan getExplainPlan(ConnectContext ctx) { - if (!ctx.getSessionVariable().isEnableNereidsDML()) { - try { - ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce(); - } catch (Exception e) { - throw new AnalysisException("failed to set fallback to original planner to true", e); - } - throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner"); - } - return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx)); + return InsertUtils.getPlanForExplain(ctx, this.logicalQuery); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java similarity index 54% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 295478349cf..ab65b065e35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -15,11 +15,8 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.nereids.trees.plans.commands; +package org.apache.doris.nereids.trees.plans.commands.insert; -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.StatementBase; -import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -27,15 +24,7 @@ import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; -import org.apache.doris.catalog.TableIf.TableType; -import org.apache.doris.common.Config; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.load.EtlJobType; -import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; @@ -59,328 +48,33 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.nereids.util.TypeCoercionUtils; -import org.apache.doris.planner.DataSink; -import org.apache.doris.planner.DataStreamSink; -import org.apache.doris.planner.ExchangeNode; -import org.apache.doris.planner.OlapTableSink; -import org.apache.doris.planner.PlanFragment; import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.InsertStreamTxnExecutor; -import org.apache.doris.qe.QeProcessorImpl; -import org.apache.doris.qe.QeProcessorImpl.QueryInfo; -import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.service.FrontendOptions; -import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TMergeType; -import org.apache.doris.thrift.TOlapTableLocationParam; -import org.apache.doris.thrift.TPartitionType; -import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TTxnParams; -import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionEntry; import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionState.LoadJobSourceType; -import org.apache.doris.transaction.TransactionState.TxnCoordinator; -import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionStatus; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; /** - * transaction wrapper for Nereids + * The helper class for insert operation. */ -public class InsertExecutor { - - private static final Logger LOG = LogManager.getLogger(InsertExecutor.class); - private static final long INVALID_TXN_ID = -1L; - - private final ConnectContext ctx; - private final Coordinator coordinator; - private final String labelName; - private final Database database; - private final Table table; - private final long createAt = System.currentTimeMillis(); - private long loadedRows = 0; - private int filteredRows = 0; - private long txnId = INVALID_TXN_ID; - private TransactionStatus txnStatus = TransactionStatus.ABORTED; - private String errMsg = ""; - - /** - * constructor - */ - public InsertExecutor(ConnectContext ctx, Database database, Table table, - String labelName, NereidsPlanner planner) { - this.ctx = ctx; - this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator()); - this.labelName = labelName; - this.database = database; - this.table = table; - } - - public long getTxnId() { - return txnId; - } - - /** - * begin transaction if necessary - */ - public void beginTransaction() { - if (!(table instanceof OlapTable)) { - return; - } - try { - this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( - database.getId(), ImmutableList.of(table.getId()), labelName, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), - LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout()); - } catch (Exception e) { - throw new AnalysisException("begin transaction failed. " + e.getMessage(), e); - } - } - - /** - * finalize sink to complete enough info for sink execution - */ - public void finalizeSink(PlanFragment fragment, DataSink sink, - boolean isPartialUpdate, boolean isFromInsert, boolean allowAutoPartition) { - if (!(sink instanceof OlapTableSink)) { - return; - } - Preconditions.checkState(table instanceof OlapTable, - "sink is OlapTableSink, but table type is " + table.getType()); - OlapTableSink olapTableSink = (OlapTableSink) sink; - boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict() - && isPartialUpdate - && isFromInsert; - try { - // TODO refactor this to avoid call legacy planner's function - olapTableSink.init(ctx.queryId(), txnId, database.getId(), - ctx.getExecTimeout(), - ctx.getSessionVariable().getSendBatchParallelism(), - false, - isStrictMode); - olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); - if (!allowAutoPartition) { - olapTableSink.setAutoPartition(false); - } - // update - - // set schema and partition info for tablet id shuffle exchange - if (fragment.getPlanRoot() instanceof ExchangeNode - && fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) { - DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink()); - Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get()); - dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema( - database.getId(), olapTableSink.getDstTable(), analyzer)); - dataStreamSink.setTabletSinkPartitionParam(olapTableSink.createPartition( - database.getId(), olapTableSink.getDstTable(), analyzer)); - dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor()); - List<TOlapTableLocationParam> locationParams = olapTableSink - .createLocation(olapTableSink.getDstTable()); - dataStreamSink.setTabletSinkLocationParam(locationParams.get(0)); - dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId()); - } - } catch (Exception e) { - throw new AnalysisException(e.getMessage(), e); - } - TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId); - if (state == null) { - throw new AnalysisException("txn does not exist: " + txnId); - } - state.addTableIndexes((OlapTable) table); - if (isPartialUpdate) { - state.setSchemaForPartialUpdate((OlapTable) table); - } - } - - /** - * execute insert txn for insert into select command. - */ - public void executeSingleInsertTransaction(StmtExecutor executor, long jobId) { - String queryId = DebugUtil.printId(ctx.queryId()); - LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId); - Throwable throwable = null; - - try { - coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); - coordinator.setQueryType(TQueryType.LOAD); - executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile()); - QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator); - QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo); - coordinator.exec(); - int execTimeout = ctx.getExecTimeout(); - if (LOG.isDebugEnabled()) { - LOG.debug("insert [{}] with query id {} execution timeout is {}", labelName, queryId, execTimeout); - } - boolean notTimeout = coordinator.join(execTimeout); - if (!coordinator.isDone()) { - coordinator.cancel(); - if (notTimeout) { - errMsg = coordinator.getExecStatus().getErrorMsg(); - ErrorReport.reportDdlException("there exists unhealthy backend. " - + errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } else { - ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); - } - } - if (!coordinator.getExecStatus().ok()) { - errMsg = coordinator.getExecStatus().getErrorMsg(); - LOG.warn("insert [{}] with query id {} failed, {}", labelName, queryId, errMsg); - ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); - } - if (LOG.isDebugEnabled()) { - LOG.debug("insert [{}] with query id {} delta files is {}", - labelName, queryId, coordinator.getDeltaUrls()); - } - if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { - loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); - } - if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { - filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); - } - - // if in strict mode, insert will fail if there are filtered rows - if (ctx.getSessionVariable().getEnableInsertStrict()) { - if (filteredRows > 0) { - ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT, - "Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl()); - return; - } - } - - if (table.getType() != TableType.OLAP && table.getType() != TableType.MATERIALIZED_VIEW) { - // no need to add load job. - // MySQL table is already being inserted. - ctx.getState().setOk(loadedRows, filteredRows, null); - return; - } - - if (ctx.getState().getStateType() == MysqlStateType.ERR) { - try { - String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); - Env.getCurrentGlobalTransactionMgr().abortTransaction( - database.getId(), txnId, - (errMsg == null ? "unknown reason" : errMsg)); - } catch (Exception abortTxnException) { - LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier(), abortTxnException); - } - } else if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( - database, Lists.newArrayList(table), - txnId, - TabletCommitInfo.fromThrift(coordinator.getCommitInfos()), - ctx.getSessionVariable().getInsertVisibleTimeoutMs())) { - txnStatus = TransactionStatus.VISIBLE; - } else { - txnStatus = TransactionStatus.COMMITTED; - } - - } catch (Throwable t) { - // if any throwable being thrown during insert operation, first we should abort this txn - LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); - if (txnId != INVALID_TXN_ID) { - try { - Env.getCurrentGlobalTransactionMgr().abortTransaction( - database.getId(), txnId, - t.getMessage() == null ? "unknown reason" : t.getMessage()); - } catch (Exception abortTxnException) { - // just print a log if abort txn failed. This failure do not need to pass to user. - // user only concern abort how txn failed. - LOG.warn("insert [{}] with query id {} abort txn {} failed", - labelName, queryId, txnId, abortTxnException); - } - } - - if (!Config.using_old_load_usage_pattern) { - // if not using old load usage pattern, error will be returned directly to user - StringBuilder sb = new StringBuilder(t.getMessage()); - if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { - sb.append(". url: ").append(coordinator.getTrackingUrl()); - } - ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString()); - return; - } - - /* - * If config 'using_old_load_usage_pattern' is true. - * Doris will return a label to user, and user can use this label to check load job's status, - * which exactly like the old insert stmt usage pattern. - */ - throwable = t; - } finally { - executor.updateProfile(true); - QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId()); - } - - // Go here, which means: - // 1. transaction is finished successfully (COMMITTED or VISIBLE), or - // 2. transaction failed but Config.using_old_load_usage_pattern is true. - // we will record the load job info for these 2 cases - try { - // the statement parsed by Nereids is saved at executor::parsedStmt. - StatementBase statement = executor.getParsedStmt(); - UserIdentity userIdentity; - //if we use job scheduler, parse statement will not set user identity,so we need to get it from context - if (null == statement) { - userIdentity = ctx.getCurrentUserIdentity(); - } else { - userIdentity = statement.getUserInfo(); - } - EtlJobType etlJobType = EtlJobType.INSERT; - if (0 != jobId) { - etlJobType = EtlJobType.INSERT_JOB; - } - if (!Config.enable_nereids_load) { - // just record for loadv2 here - ctx.getEnv().getLoadManager() - .recordFinishedLoadJob(labelName, txnId, database.getFullName(), - table.getId(), - etlJobType, createAt, throwable == null ? "" : throwable.getMessage(), - coordinator.getTrackingUrl(), userIdentity, jobId); - } - } catch (MetaNotFoundException e) { - LOG.warn("Record info of insert load with error {}", e.getMessage(), e); - errMsg = "Record info of insert load with error " + e.getMessage(); - } - - // {'label':'my_label1', 'status':'visible', 'txnId':'123'} - // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'} - StringBuilder sb = new StringBuilder(); - sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name()); - sb.append("', 'txnId':'").append(txnId).append("'"); - if (table.getType() == TableType.MATERIALIZED_VIEW) { - sb.append("', 'rows':'").append(loadedRows).append("'"); - } - if (!Strings.isNullOrEmpty(errMsg)) { - sb.append(", 'err':'").append(errMsg).append("'"); - } - sb.append("}"); - - ctx.getState().setOk(loadedRows, filteredRows, sb.toString()); - // set insert result in connection context, - // so that user can use `show insert result` to get info of the last insert operation. - ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(), - txnStatus, loadedRows, filteredRows); - // update it, so that user can get loaded rows in fe.audit.log - ctx.updateReturnRows((int) loadedRows); - } +public class InsertUtils { /** * execute insert values in transaction. @@ -670,7 +364,18 @@ public class InsertExecutor { } } - public Coordinator getCoordinator() { - return coordinator; + /** + * get plan for explain. + */ + public static Plan getPlanForExplain(ConnectContext ctx, LogicalPlan logicalQuery) { + if (!ctx.getSessionVariable().isEnableNereidsDML()) { + try { + ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce(); + } catch (Exception e) { + throw new AnalysisException("failed to set fallback to original planner to true", e); + } + throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner"); + } + return InsertUtils.normalizePlan(logicalQuery, InsertUtils.getTargetTable(logicalQuery, ctx)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java new file mode 100644 index 00000000000..23dd8d13d91 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +/** + * For Olap Table + */ +public class OlapInsertCommandContext extends InsertCommandContext { + private boolean allowAutoPartition = true; + + public boolean isAllowAutoPartition() { + return allowAutoPartition; + } + + public void setAllowAutoPartition(boolean allowAutoPartition) { + this.allowAutoPartition = allowAutoPartition; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java new file mode 100644 index 00000000000..b24426b1e87 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.Config; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.load.EtlJobType; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.DataStreamSink; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TOlapTableLocationParam; +import org.apache.doris.thrift.TPartitionType; +import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; +import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Optional; + +/** + * Insert executor for olap table + */ +public class OlapInsertExecutor extends AbstractInsertExecutor { + private static final Logger LOG = LogManager.getLogger(OlapInsertExecutor.class); + private static final long INVALID_TXN_ID = -1L; + private long txnId = INVALID_TXN_ID; + private TransactionStatus txnStatus = TransactionStatus.ABORTED; + + /** + * constructor + */ + public OlapInsertExecutor(ConnectContext ctx, Table table, + String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx) { + super(ctx, table, labelName, planner, insertCtx); + } + + public long getTxnId() { + return txnId; + } + + @Override + public void beginTransaction() { + try { + this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( + database.getId(), ImmutableList.of(table.getId()), labelName, + new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout()); + } catch (Exception e) { + throw new AnalysisException("begin transaction failed. " + e.getMessage(), e); + } + } + + @Override + public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { + OlapTableSink olapTableSink = (OlapTableSink) sink; + PhysicalOlapTableSink physicalOlapTableSink = (PhysicalOlapTableSink) physicalSink; + OlapInsertCommandContext olapInsertCtx = (OlapInsertCommandContext) insertCtx.orElse( + new OlapInsertCommandContext()); + + boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict() + && physicalOlapTableSink.isPartialUpdate() + && physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT; + try { + // TODO refactor this to avoid call legacy planner's function + int timeout = ctx.getExecTimeout(); + olapTableSink.init(ctx.queryId(), txnId, database.getId(), + timeout, + ctx.getSessionVariable().getSendBatchParallelism(), + false, + isStrictMode); + olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); + if (!olapInsertCtx.isAllowAutoPartition()) { + olapTableSink.setAutoPartition(false); + } + // update + + // set schema and partition info for tablet id shuffle exchange + if (fragment.getPlanRoot() instanceof ExchangeNode + && fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) { + DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink()); + Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get()); + dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema( + database.getId(), olapTableSink.getDstTable(), analyzer)); + dataStreamSink.setTabletSinkPartitionParam(olapTableSink.createPartition( + database.getId(), olapTableSink.getDstTable(), analyzer)); + dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor()); + List<TOlapTableLocationParam> locationParams = olapTableSink + .createLocation(olapTableSink.getDstTable()); + dataStreamSink.setTabletSinkLocationParam(locationParams.get(0)); + dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId()); + } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e); + } + TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId); + if (state == null) { + throw new AnalysisException("txn does not exist: " + txnId); + } + state.addTableIndexes((OlapTable) table); + if (physicalOlapTableSink.isPartialUpdate()) { + state.setSchemaForPartialUpdate((OlapTable) table); + } + } + + @Override + protected void beforeExec() { + String queryId = DebugUtil.printId(ctx.queryId()); + LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId); + } + + @Override + protected void onComplete() throws UserException { + if (ctx.getState().getStateType() == MysqlStateType.ERR) { + try { + String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); + Env.getCurrentGlobalTransactionMgr().abortTransaction( + database.getId(), txnId, + (errMsg == null ? "unknown reason" : errMsg)); + } catch (Exception abortTxnException) { + LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier(), abortTxnException); + } + } else if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction( + database, Lists.newArrayList((Table) table), + txnId, + TabletCommitInfo.fromThrift(coordinator.getCommitInfos()), + ctx.getSessionVariable().getInsertVisibleTimeoutMs())) { + txnStatus = TransactionStatus.VISIBLE; + } else { + txnStatus = TransactionStatus.COMMITTED; + } + } + + @Override + protected void onFail(Throwable t) { + errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); + String queryId = DebugUtil.printId(ctx.queryId()); + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); + if (txnId != INVALID_TXN_ID) { + try { + Env.getCurrentGlobalTransactionMgr().abortTransaction( + database.getId(), txnId, errMsg); + } catch (Exception abortTxnException) { + // just print a log if abort txn failed. This failure do not need to pass to user. + // user only concern abort how txn failed. + LOG.warn("insert [{}] with query id {} abort txn {} failed", + labelName, queryId, txnId, abortTxnException); + } + } + + StringBuilder sb = new StringBuilder(t.getMessage()); + if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { + sb.append(". url: ").append(coordinator.getTrackingUrl()); + } + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString()); + } + + @Override + protected void afterExec(StmtExecutor executor) { + // Go here, which means: + // 1. transaction is finished successfully (COMMITTED or VISIBLE), or + // 2. transaction failed but Config.using_old_load_usage_pattern is true. + // we will record the load job info for these 2 cases + try { + // the statement parsed by Nereids is saved at executor::parsedStmt. + StatementBase statement = executor.getParsedStmt(); + UserIdentity userIdentity; + // if we use job scheduler, parse statement will not set user identity,so we need to get it from context + if (null == statement) { + userIdentity = ctx.getCurrentUserIdentity(); + } else { + userIdentity = statement.getUserInfo(); + } + EtlJobType etlJobType = EtlJobType.INSERT; + if (0 != jobId) { + etlJobType = EtlJobType.INSERT_JOB; + } + if (!Config.enable_nereids_load) { + // just record for loadv2 here + ctx.getEnv().getLoadManager() + .recordFinishedLoadJob(labelName, txnId, database.getFullName(), + table.getId(), + etlJobType, createTime, errMsg, + coordinator.getTrackingUrl(), userIdentity, jobId); + } + } catch (MetaNotFoundException e) { + LOG.warn("Record info of insert load with error {}", e.getMessage(), e); + errMsg = "Record info of insert load with error " + e.getMessage(); + } + + // {'label':'my_label1', 'status':'visible', 'txnId':'123'} + // {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'} + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name()); + sb.append("', 'txnId':'").append(txnId).append("'"); + if (table.getType() == TableType.MATERIALIZED_VIEW) { + sb.append("', 'rows':'").append(loadedRows).append("'"); + } + if (!Strings.isNullOrEmpty(errMsg)) { + sb.append(", 'err':'").append(errMsg).append("'"); + } + sb.append("}"); + + ctx.getState().setOk(loadedRows, filteredRows, sb.toString()); + // set insert result in connection context, + // so that user can use `show insert result` to get info of the last insert operation. + ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(), + txnStatus, loadedRows, filteredRows); + // update it, so that user can get loaded rows in fe.audit.log + ctx.updateReturnRows((int) loadedRows); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 9626942e10b..6bcc41e39e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -19,7 +19,6 @@ package org.apache.doris.nereids.trees.plans.visitor; import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; -import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.CallCommand; import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand; import org.apache.doris.nereids.trees.plans.commands.Command; @@ -34,8 +33,6 @@ import org.apache.doris.nereids.trees.plans.commands.DropMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.DropProcedureCommand; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; import org.apache.doris.nereids.trees.plans.commands.ExportCommand; -import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; -import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.commands.LoadCommand; import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand; @@ -44,6 +41,9 @@ import org.apache.doris.nereids.trees.plans.commands.ShowConstraintsCommand; import org.apache.doris.nereids.trees.plans.commands.ShowCreateProcedureCommand; import org.apache.doris.nereids.trees.plans.commands.ShowProcedureStatusCommand; import org.apache.doris.nereids.trees.plans.commands.UpdateCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; /** CommandVisitor. */ public interface CommandVisitor<R, C> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index b6a8c882fae..c457a5feccd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -128,13 +128,13 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.minidump.MinidumpUtils; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook; -import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; import org.apache.doris.nereids.trees.plans.commands.Forward; -import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; -import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback; +import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.planner.OlapScanNode; @@ -2137,22 +2137,12 @@ public class StmtExecutor { LOG.warn("errors when abort txn", abortTxnException); } - if (!Config.using_old_load_usage_pattern) { - // if not using old load usage pattern, error will be returned directly to user - StringBuilder sb = new StringBuilder(t.getMessage()); - if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { - sb.append(". url: " + coord.getTrackingUrl()); - } - context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString()); - return; + StringBuilder sb = new StringBuilder(t.getMessage()); + if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) { + sb.append(". url: " + coord.getTrackingUrl()); } - - /* - * If config 'using_old_load_usage_pattern' is true. - * Doris will return a label to user, and user can use this label to check load job's status, - * which exactly like the old insert stmt usage pattern. - */ - throwable = t; + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString()); + return; } finally { if (coord != null) { coord.close(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java index 58212c2d3ba..7bcbecafb02 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java @@ -24,7 +24,7 @@ import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.datasets.ssb.SSBTestBase; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.properties.PhysicalProperties; -import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.junit.jupiter.api.Assertions; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java index 14ba32d61be..7613a2d284b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java @@ -51,8 +51,6 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.List; -import java.util.concurrent.locks.ReentrantReadWriteLock; - public class HmsQueryCacheTest extends AnalyzeCheckTestBase { private static final String HMS_CATALOG = "hms_ctl"; @@ -119,7 +117,6 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { Deencapsulation.setField(db, "initialized", true); Deencapsulation.setField(tbl, "objectCreated", true); - Deencapsulation.setField(tbl, "rwLock", new ReentrantReadWriteLock(true)); Deencapsulation.setField(tbl, "schemaUpdateTime", NOW); Deencapsulation.setField(tbl, "eventUpdateTime", 0); new Expectations(tbl) { @@ -167,7 +164,6 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { }; Deencapsulation.setField(tbl2, "objectCreated", true); - Deencapsulation.setField(tbl2, "rwLock", new ReentrantReadWriteLock(true)); Deencapsulation.setField(tbl2, "schemaUpdateTime", NOW); Deencapsulation.setField(tbl2, "eventUpdateTime", 0); new Expectations(tbl2) { @@ -215,7 +211,6 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { }; Deencapsulation.setField(view1, "objectCreated", true); - Deencapsulation.setField(view1, "rwLock", new ReentrantReadWriteLock(true)); new Expectations(view1) { { @@ -270,7 +265,6 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase { }; Deencapsulation.setField(view2, "objectCreated", true); - Deencapsulation.setField(view2, "rwLock", new ReentrantReadWriteLock(true)); new Expectations(view2) { { view2.getId(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org