This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4732aae628f [Refactor](insert) refactor insert command to support 
other type of table (#31610) (#32345)
4732aae628f is described below

commit 4732aae628fb9f9ebf60e63830bee512eb40cd2c
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Sun Mar 17 20:46:07 2024 +0800

    [Refactor](insert) refactor insert command to support other type of table 
(#31610) (#32345)
    
    bp #31610
---
 .../main/java/org/apache/doris/common/Config.java  |   8 -
 .../java/org/apache/doris/catalog/TableIf.java     |  54 +++-
 .../org/apache/doris/datasource/ExternalTable.java | 104 -------
 .../doris/job/extensions/insert/InsertJob.java     |   2 +-
 .../doris/job/extensions/insert/InsertTask.java    |   2 +-
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   8 +-
 .../trees/plans/commands/CreateTableCommand.java   |   3 +-
 .../plans/commands/DeleteFromUsingCommand.java     |   4 +-
 .../plans/commands/InsertIntoTableCommand.java     | 291 ------------------
 .../nereids/trees/plans/commands/LoadCommand.java  |   3 +-
 .../trees/plans/commands/UpdateCommand.java        |   4 +-
 .../plans/commands/UpdateMvByPartitionCommand.java |   1 +
 .../commands/insert/AbstractInsertExecutor.java    | 180 ++++++++++++
 .../{ => insert}/BatchInsertIntoTableCommand.java  |  22 +-
 .../plans/commands/insert/GroupCommitInserter.java | 142 +++++++++
 .../commands/insert/InsertCommandContext.java      |  26 ++
 .../commands/insert/InsertIntoTableCommand.java    | 190 ++++++++++++
 .../{ => insert}/InsertOverwriteTableCommand.java  |  25 +-
 .../InsertUtils.java}                              | 327 +--------------------
 .../commands/insert/OlapInsertCommandContext.java  |  33 +++
 .../plans/commands/insert/OlapInsertExecutor.java  | 257 ++++++++++++++++
 .../trees/plans/visitor/CommandVisitor.java        |   6 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  26 +-
 .../apache/doris/nereids/util/ReadLockTest.java    |   2 +-
 .../org/apache/doris/qe/HmsQueryCacheTest.java     |   6 -
 25 files changed, 930 insertions(+), 796 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2c5a36e9f85..d4b83ccb51e 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1167,14 +1167,6 @@ public class Config extends ConfigBase {
     @ConfField
     public static String small_file_dir = System.getenv("DORIS_HOME") + 
"/small_files";
 
-    /**
-     * If set to true, the insert stmt with processing error will still return 
a label to user.
-     * And user can use this label to check the load job's status.
-     * The default value is false, which means if insert operation encounter 
errors,
-     * exception will be thrown to user client directly without load label.
-     */
-    @ConfField(mutable = true, masterOnly = true) public static boolean 
using_old_load_usage_pattern = false;
-
     /**
      * This will limit the max recursion depth of hash distribution pruner.
      * eg: where a in (5 elements) and b in (4 elements) and c in (3 elements) 
and d in (2 elements).
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index fd7f5d53880..1ef0e9c2f3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -55,35 +55,59 @@ import java.util.stream.Collectors;
 public interface TableIf {
     Logger LOG = LogManager.getLogger(TableIf.class);
 
-    void readLock();
+    default void readLock() {
+    }
+
+    default boolean tryReadLock(long timeout, TimeUnit unit) {
+        return true;
+    }
 
-    boolean tryReadLock(long timeout, TimeUnit unit);
+    default void readUnlock() {
+    }
 
-    void readUnlock();
+    ;
 
-    void writeLock();
+    default void writeLock() {
+    }
 
-    boolean writeLockIfExist();
+    default boolean writeLockIfExist() {
+        return true;
+    }
 
-    boolean tryWriteLock(long timeout, TimeUnit unit);
+    default boolean tryWriteLock(long timeout, TimeUnit unit) {
+        return true;
+    }
 
-    void writeUnlock();
+    default void writeUnlock() {
+    }
 
-    boolean isWriteLockHeldByCurrentThread();
+    default boolean isWriteLockHeldByCurrentThread() {
+        return true;
+    }
 
-    <E extends Exception> void writeLockOrException(E e) throws E;
+    default <E extends Exception> void writeLockOrException(E e) throws E {
+    }
 
-    void writeLockOrDdlException() throws DdlException;
+    default void writeLockOrDdlException() throws DdlException {
+    }
 
-    void writeLockOrMetaException() throws MetaNotFoundException;
+    default void writeLockOrMetaException() throws MetaNotFoundException {
+    }
 
-    void writeLockOrAlterCancelException() throws AlterCancelException;
+    default void writeLockOrAlterCancelException() throws AlterCancelException 
{
+    }
 
-    boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws 
MetaNotFoundException;
+    default boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) 
throws MetaNotFoundException {
+        return true;
+    }
 
-    <E extends Exception> boolean tryWriteLockOrException(long timeout, 
TimeUnit unit, E e) throws E;
+    default <E extends Exception> boolean tryWriteLockOrException(long 
timeout, TimeUnit unit, E e) throws E {
+        return true;
+    }
 
-    boolean tryWriteLockIfExist(long timeout, TimeUnit unit);
+    default boolean tryWriteLockIfExist(long timeout, TimeUnit unit) {
+        return true;
+    }
 
     long getId();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index f39852fd215..7f82d0d3876 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.datasource;
 
-import org.apache.doris.alter.AlterCancelException;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
@@ -25,9 +24,6 @@ import org.apache.doris.catalog.TableAttributes;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.constraint.Constraint;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.Util;
@@ -55,8 +51,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
@@ -86,7 +80,6 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
     protected long dbId;
     protected boolean objectCreated;
     protected ExternalCatalog catalog;
-    protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
     /**
      * No args constructor for persist.
@@ -132,102 +125,6 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
         }
     }
 
-    @Override
-    public void readLock() {
-        this.rwLock.readLock().lock();
-    }
-
-    @Override
-    public boolean tryReadLock(long timeout, TimeUnit unit) {
-        try {
-            return this.rwLock.readLock().tryLock(timeout, unit);
-        } catch (InterruptedException e) {
-            LOG.warn("failed to try read lock at table[" + name + "]", e);
-            return false;
-        }
-    }
-
-    @Override
-    public void readUnlock() {
-        this.rwLock.readLock().unlock();
-    }
-
-    @Override
-    public void writeLock() {
-        this.rwLock.writeLock().lock();
-    }
-
-    @Override
-    public boolean writeLockIfExist() {
-        writeLock();
-        return true;
-    }
-
-    @Override
-    public boolean tryWriteLock(long timeout, TimeUnit unit) {
-        try {
-            return this.rwLock.writeLock().tryLock(timeout, unit);
-        } catch (InterruptedException e) {
-            LOG.warn("failed to try write lock at table[" + name + "]", e);
-            return false;
-        }
-    }
-
-    @Override
-    public void writeUnlock() {
-        this.rwLock.writeLock().unlock();
-    }
-
-    @Override
-    public boolean isWriteLockHeldByCurrentThread() {
-        return this.rwLock.writeLock().isHeldByCurrentThread();
-    }
-
-    @Override
-    public <E extends Exception> void writeLockOrException(E e) throws E {
-        writeLock();
-    }
-
-    @Override
-    public void writeLockOrDdlException() throws DdlException {
-        writeLockOrException(new DdlException("unknown table, tableName=" + 
name,
-                                        ErrorCode.ERR_BAD_TABLE_ERROR));
-    }
-
-    @Override
-    public void writeLockOrMetaException() throws MetaNotFoundException {
-        writeLockOrException(new MetaNotFoundException("unknown table, 
tableName=" + name,
-                                        ErrorCode.ERR_BAD_TABLE_ERROR));
-    }
-
-    @Override
-    public void writeLockOrAlterCancelException() throws AlterCancelException {
-        writeLockOrException(new AlterCancelException("unknown table, 
tableName=" + name,
-                                        ErrorCode.ERR_BAD_TABLE_ERROR));
-    }
-
-    @Override
-    public boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) 
throws MetaNotFoundException {
-        return tryWriteLockOrException(timeout, unit, new 
MetaNotFoundException("unknown table, tableName=" + name,
-                                        ErrorCode.ERR_BAD_TABLE_ERROR));
-    }
-
-    @Override
-    public <E extends Exception> boolean tryWriteLockOrException(long timeout, 
TimeUnit unit, E e) throws E {
-        if (tryWriteLock(timeout, unit)) {
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean tryWriteLockIfExist(long timeout, TimeUnit unit) {
-        if (tryWriteLock(timeout, unit)) {
-            return true;
-        }
-        return false;
-    }
-
     @Override
     public long getId() {
         return id;
@@ -417,7 +314,6 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
 
     @Override
     public void gsonPostProcess() throws IOException {
-        rwLock = new ReentrantReadWriteLock(true);
         objectCreated = false;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index 15e0c37987f..a5851892ec1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -47,7 +47,7 @@ import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.mysql.privilege.Privilege;
-import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 781eb92a714..8fe786555ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -28,7 +28,7 @@ import org.apache.doris.load.FailMsg;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
 import org.apache.doris.nereids.parser.NereidsParser;
-import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.thrift.TCell;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 37292b25588..a5f16fc87ed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -351,7 +351,6 @@ import 
org.apache.doris.nereids.trees.plans.algebra.Aggregate;
 import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
 import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
-import 
org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.CallCommand;
 import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
 import org.apache.doris.nereids.trees.plans.commands.Command;
@@ -368,8 +367,6 @@ import 
org.apache.doris.nereids.trees.plans.commands.DropProcedureCommand;
 import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
-import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
-import 
org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
 import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
@@ -406,6 +403,9 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.RollupDefinition;
 import 
org.apache.doris.nereids.trees.plans.commands.info.SimpleColumnDefinition;
 import org.apache.doris.nereids.trees.plans.commands.info.StepPartition;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
 import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
@@ -542,7 +542,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             if (ConnectContext.get() != null && 
ConnectContext.get().isTxnModel()) {
                 command = new BatchInsertIntoTableCommand(sink);
             } else {
-                command = new InsertIntoTableCommand(sink, labelName);
+                command = new InsertIntoTableCommand(sink, labelName, 
Optional.empty());
             }
         }
         if (ctx.explain() != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
index 7606097411b..10dfdd2c2b2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java
@@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
 import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.types.CharType;
@@ -155,7 +156,7 @@ public class CreateTableCommand extends Command implements 
ForwardWithSync {
         query = new UnboundTableSink<>(createTableInfo.getTableNameParts(), 
ImmutableList.of(), ImmutableList.of(),
                 ImmutableList.of(), query);
         try {
-            new InsertIntoTableCommand(query, Optional.empty()).run(ctx, 
executor);
+            new InsertIntoTableCommand(query, Optional.empty(), 
Optional.empty()).run(ctx, executor);
             if (ctx.getState().getStateType() == MysqlStateType.ERR) {
                 handleFallbackFailedCtas(ctx);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
index 6fe57d6361d..3791b47f140 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -76,7 +77,8 @@ public class DeleteFromUsingCommand extends Command 
implements ForwardWithSync,
                     + " Please check the following session variables: "
                     + String.join(", ", SessionVariable.DEBUG_VARIABLES));
         }
-        new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), 
Optional.empty()).run(ctx, executor);
+        new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), 
Optional.empty(), Optional.empty()).run(ctx,
+                executor);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
deleted file mode 100644
index 166219dfae1..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++ /dev/null
@@ -1,291 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.trees.plans.commands;
-
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.ProfileManager.ProfileType;
-import org.apache.doris.load.loadv2.LoadStatistic;
-import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.nereids.NereidsPlanner;
-import org.apache.doris.nereids.exceptions.AnalysisException;
-import org.apache.doris.nereids.glue.LogicalPlanAdapter;
-import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.plans.Explainable;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.PlanType;
-import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
-import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
-import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
-import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
-import org.apache.doris.planner.DataSink;
-import org.apache.doris.planner.GroupCommitPlanner;
-import org.apache.doris.planner.OlapTableSink;
-import org.apache.doris.planner.UnionNode;
-import org.apache.doris.proto.InternalService;
-import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.SqlModeHelper;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.rpc.RpcException;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.transaction.TransactionStatus;
-
-import com.google.common.base.Preconditions;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-/**
- * insert into select command implementation
- * insert into select command support the grammar: explain? insert into table 
columns? partitions? hints? query
- * InsertIntoTableCommand is a command to represent insert the answer of a 
query into a table.
- * class structure's:
- * InsertIntoTableCommand(Query())
- * ExplainCommand(Query())
- */
-public class InsertIntoTableCommand extends Command implements 
ForwardWithSync, Explainable {
-
-    public static final Logger LOG = 
LogManager.getLogger(InsertIntoTableCommand.class);
-
-    private LogicalPlan logicalQuery;
-    private Optional<String> labelName;
-    /**
-     * When source it's from job scheduler,it will be set.
-     */
-    private long jobId;
-    private boolean allowAutoPartition;
-
-    /**
-     * constructor
-     */
-    public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> 
labelName) {
-        super(PlanType.INSERT_INTO_TABLE_COMMAND);
-        this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery 
should not be null");
-        this.labelName = Objects.requireNonNull(labelName, "labelName should 
not be null");
-        // only insert overwrite will disable it.
-        this.allowAutoPartition = true;
-    }
-
-    public Optional<String> getLabelName() {
-        return labelName;
-    }
-
-    public void setLabelName(Optional<String> labelName) {
-        this.labelName = labelName;
-    }
-
-    public void setJobId(long jobId) {
-        this.jobId = jobId;
-    }
-
-    public void setAllowAutoPartition(boolean allowAutoPartition) {
-        this.allowAutoPartition = allowAutoPartition;
-    }
-
-    @Override
-    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
-        runInternal(ctx, executor);
-    }
-
-    public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor,
-                                  LoadStatistic loadStatistic) throws 
Exception {
-        // TODO: add coordinator statistic
-        runInternal(ctx, executor);
-    }
-
-    private void runInternal(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
-        if (!ctx.getSessionVariable().isEnableNereidsDML()) {
-            try {
-                ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
-            } catch (Exception e) {
-                throw new AnalysisException("failed to set fallback to 
original planner to true", e);
-            }
-            throw new AnalysisException("Nereids DML is disabled, will try to 
fall back to the original planner");
-        }
-
-        PhysicalOlapTableSink<?> physicalOlapTableSink;
-        DataSink sink;
-        InsertExecutor insertExecutor;
-        Table targetTable;
-        TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, 
ctx);
-        // should lock target table until we begin transaction.
-        targetTableIf.readLock();
-        try {
-            // 1. process inline table (default values, empty values)
-            this.logicalQuery = (LogicalPlan) 
InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
-
-            LogicalPlanAdapter logicalPlanAdapter = new 
LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
-            NereidsPlanner planner = new 
NereidsPlanner(ctx.getStatementContext());
-            planner.plan(logicalPlanAdapter, 
ctx.getSessionVariable().toThrift());
-            executor.setPlanner(planner);
-            executor.checkBlockRules();
-            if (ctx.getMysqlChannel() != null) {
-                ctx.getMysqlChannel().reset();
-            }
-
-            // TODO: support other type table insert into
-            Optional<PhysicalOlapTableSink<?>> plan = 
(planner.getPhysicalPlan()
-                    
.<Set<PhysicalOlapTableSink<?>>>collect(PhysicalOlapTableSink.class::isInstance)).stream()
-                    .findAny();
-            Preconditions.checkArgument(plan.isPresent(), "insert into command 
must contain OlapTableSinkNode");
-            physicalOlapTableSink = plan.get();
-
-            targetTable = physicalOlapTableSink.getTargetTable();
-            // check auth
-            if (!Env.getCurrentEnv().getAccessManager()
-                    .checkTblPriv(ConnectContext.get(), 
targetTable.getQualifiedDbName(), targetTable.getName(),
-                            PrivPredicate.LOAD)) {
-                
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
-                        ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
-                        targetTable.getQualifiedDbName() + ": " + 
targetTable.getName());
-            }
-            sink = planner.getFragments().get(0).getSink();
-            // group commit
-            if (analyzeGroupCommit(ctx, sink, physicalOlapTableSink)) {
-                // handleGroupCommit(ctx, sink, physicalOlapTableSink);
-                // return;
-                throw new AnalysisException("group commit is not supported in 
nereids now");
-            }
-
-            String label = this.labelName.orElse(String.format("label_%x_%x", 
ctx.queryId().hi, ctx.queryId().lo));
-            insertExecutor = new InsertExecutor(ctx,
-                    physicalOlapTableSink.getDatabase(),
-                    physicalOlapTableSink.getTargetTable(), label, planner);
-            insertExecutor.beginTransaction();
-            insertExecutor.finalizeSink(planner.getFragments().get(0), sink, 
physicalOlapTableSink.isPartialUpdate(),
-                    physicalOlapTableSink.getDmlCommandType() == 
DMLCommandType.INSERT, this.allowAutoPartition);
-        } finally {
-            targetTableIf.readUnlock();
-        }
-
-        boolean isEnableMemtableOnSinkNode =
-                    ((OlapTable) 
targetTable).getTableProperty().getUseSchemaLightChange()
-                    ? 
insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode() 
: false;
-        
insertExecutor.getCoordinator().getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
-        executor.setProfileType(ProfileType.LOAD);
-        // We exposed @StmtExecutor#cancel as a unified entry point for 
statement interruption
-        // so we need to set this here
-        executor.setCoord(insertExecutor.getCoordinator());
-        insertExecutor.executeSingleInsertTransaction(executor, jobId);
-    }
-
-    private void handleGroupCommit(ConnectContext ctx, DataSink sink,
-                PhysicalOlapTableSink<?> physicalOlapTableSink)
-                throws UserException, RpcException, TException, 
ExecutionException, InterruptedException {
-        // TODO we should refactor this to remove rely on UnionNode
-        List<InternalService.PDataRow> rows = new ArrayList<>();
-        List<List<Expr>> materializedConstExprLists = ((UnionNode) 
sink.getFragment()
-                .getPlanRoot()).getMaterializedConstExprLists();
-        int filterSize = 0;
-        for (Slot slot : physicalOlapTableSink.getOutput()) {
-            if (slot.getName().contains(Column.DELETE_SIGN)
-                    || slot.getName().contains(Column.VERSION_COL)) {
-                filterSize += 1;
-            }
-        }
-        for (List<Expr> list : materializedConstExprLists) {
-            rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
-        }
-        GroupCommitPlanner groupCommitPlanner = new 
GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
-                physicalOlapTableSink.getTargetTable(), null, ctx.queryId(),
-                ConnectContext.get().getSessionVariable().getGroupCommit());
-        PGroupCommitInsertResponse response = 
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
-        TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
-        if (code == TStatusCode.DATA_QUALITY_ERROR) {
-            LOG.info("group commit insert failed. query id: {}, backend id: 
{}, status: {}, "
-                    + "schema version: {}", ctx.queryId(),
-                    groupCommitPlanner.getBackend(), response.getStatus(),
-                    
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
-        } else if (code != TStatusCode.OK) {
-            String errMsg = "group commit insert failed. backend id: "
-                    + groupCommitPlanner.getBackend().getId() + ", status: "
-                    + response.getStatus();
-            ErrorReport.reportDdlException(errMsg, 
ErrorCode.ERR_FAILED_WHEN_INSERT);
-        }
-        TransactionStatus txnStatus = TransactionStatus.PREPARE;
-        String sb = "{'label':'" + response.getLabel() + "', 'status':'" + 
txnStatus.name()
-                + "', 'txnId':'" + response.getTxnId() + "'"
-                + "', 'optimizer':'" + "nereids" + "'"
-                + "}";
-        ctx.getState().setOk(response.getLoadedRows(), (int) 
response.getFilteredRows(), sb);
-        ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
-                physicalOlapTableSink.getDatabase().getFullName(), 
physicalOlapTableSink.getTargetTable().getName(),
-                txnStatus, response.getLoadedRows(), (int) 
response.getFilteredRows());
-        // update it, so that user can get loaded rows in fe.audit.log
-        ctx.updateReturnRows((int) response.getLoadedRows());
-    }
-
-    private boolean analyzeGroupCommit(ConnectContext ctx, DataSink sink,
-            PhysicalOlapTableSink<?> physicalOlapTableSink) {
-        if (!(sink instanceof OlapTableSink) || 
!ctx.getSessionVariable().isEnableInsertGroupCommit()
-                || ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
-            return false;
-        }
-        OlapTable targetTable = physicalOlapTableSink.getTargetTable();
-        return ctx.getSessionVariable().getSqlMode() != 
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
-                && !ctx.isTxnModel() && 
isGroupCommitAvailablePlan(physicalOlapTableSink)
-                && physicalOlapTableSink.getPartitionIds().isEmpty() && 
targetTable.getTableProperty()
-                .getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
-                .equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
-    }
-
-    private boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? extends 
Plan> sink) {
-        Plan child = sink.child();
-        if (child instanceof PhysicalDistribute) {
-            child = child.child(0);
-        }
-        return child instanceof OneRowRelation || (child instanceof 
PhysicalUnion && child.arity() == 0);
-    }
-
-    @Override
-    public Plan getExplainPlan(ConnectContext ctx) {
-        if (!ctx.getSessionVariable().isEnableNereidsDML()) {
-            try {
-                ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
-            } catch (Exception e) {
-                throw new AnalysisException("failed to set fallback to 
original planner to true", e);
-            }
-            throw new AnalysisException("Nereids DML is disabled, will try to 
fall back to the original planner");
-        }
-        return InsertExecutor.normalizePlan(this.logicalQuery, 
InsertExecutor.getTargetTable(this.logicalQuery, ctx));
-    }
-
-    @Override
-    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-        return visitor.visitInsertIntoTableCommand(this, context);
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index ce61a3bbb3f..01f2d0f8e85 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -49,6 +49,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
 import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
 import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@@ -130,7 +131,7 @@ public class LoadCommand extends Command implements 
ForwardWithSync {
         profile.getSummaryProfile().setQueryBeginTime();
         if (sourceInfos.size() == 1) {
             plans = ImmutableList.of(new 
InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
-                    Optional.of(labelName)));
+                    Optional.of(labelName), Optional.empty()));
         } else {
             throw new AnalysisException("Multi insert into statements are 
unsupported.");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
index cde07c957a3..29d3b8d96f3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java
@@ -35,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -93,7 +94,8 @@ public class UpdateCommand extends Command implements 
ForwardWithSync, Explainab
 
     @Override
     public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
-        new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), 
Optional.empty()).run(ctx, executor);
+        new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), 
Optional.empty(), Optional.empty()).run(ctx,
+                executor);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index f6bd39ac838..8b714b64edf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -39,6 +39,7 @@ import 
org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
new file mode 100644
index 00000000000..1a9ce2d744d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.task.LoadEtlTask;
+import org.apache.doris.thrift.TQueryType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Abstract insert executor.
+ * The derived class should implement the abstract method for certain type of 
target table
+ */
+public abstract class AbstractInsertExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(AbstractInsertExecutor.class);
+    protected long jobId;
+    protected final ConnectContext ctx;
+    protected final Coordinator coordinator;
+    protected final String labelName;
+    protected final DatabaseIf database;
+    protected final TableIf table;
+    protected final long createTime = System.currentTimeMillis();
+    protected long loadedRows = 0;
+    protected int filteredRows = 0;
+    protected String errMsg = "";
+    protected Optional<InsertCommandContext> insertCtx;
+
+    /**
+     * Constructor
+     */
+    public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
+            Optional<InsertCommandContext> insertCtx) {
+        this.ctx = ctx;
+        this.coordinator = new Coordinator(ctx, null, planner, 
ctx.getStatsErrorEstimator());
+        this.labelName = labelName;
+        this.table = table;
+        this.database = table.getDatabase();
+        this.insertCtx = insertCtx;
+    }
+
+    public Coordinator getCoordinator() {
+        return coordinator;
+    }
+
+    /**
+     * begin transaction if necessary
+     */
+    public abstract void beginTransaction();
+
+    /**
+     * finalize sink to complete enough info for sink execution
+     */
+    protected abstract void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink);
+
+    /**
+     * Do something before exec
+     */
+    protected abstract void beforeExec();
+
+    /**
+     * Do something after exec finished
+     */
+    protected abstract void onComplete() throws UserException;
+
+    /**
+     * Do something when exec throw exception
+     */
+    protected abstract void onFail(Throwable t);
+
+    /**
+     * Do something after exec
+     */
+    protected abstract void afterExec(StmtExecutor executor);
+
+    protected final void execImpl(StmtExecutor executor, long jobId) throws 
Exception {
+        String queryId = DebugUtil.printId(ctx.queryId());
+        
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
+        coordinator.setQueryType(TQueryType.LOAD);
+        
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
+        QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), 
executor.getOriginStmtInString(), coordinator);
+        QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
+        coordinator.exec();
+        int execTimeout = ctx.getExecTimeout();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("insert [{}] with query id {} execution timeout is {}", 
labelName, queryId, execTimeout);
+        }
+        boolean notTimeout = coordinator.join(execTimeout);
+        if (!coordinator.isDone()) {
+            coordinator.cancel();
+            if (notTimeout) {
+                errMsg = coordinator.getExecStatus().getErrorMsg();
+                ErrorReport.reportDdlException("there exists unhealthy 
backend. "
+                        + errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
+            }
+        }
+        if (!coordinator.getExecStatus().ok()) {
+            errMsg = coordinator.getExecStatus().getErrorMsg();
+            LOG.warn("insert [{}] with query id {} failed, {}", labelName, 
queryId, errMsg);
+            ErrorReport.reportDdlException(errMsg, 
ErrorCode.ERR_FAILED_WHEN_INSERT);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("insert [{}] with query id {} delta files is {}",
+                    labelName, queryId, coordinator.getDeltaUrls());
+        }
+        if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != 
null) {
+            loadedRows = 
Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
+        }
+        if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != 
null) {
+            filteredRows = 
Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
+        }
+    }
+
+    private boolean checkStrictMode() {
+        // if in strict mode, insert will fail if there are filtered rows
+        if (ctx.getSessionVariable().getEnableInsertStrict()) {
+            if (filteredRows > 0) {
+                ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
+                        "Insert has filtered data in strict mode, 
tracking_url=" + coordinator.getTrackingUrl());
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * execute insert txn for insert into select command.
+     */
+    public void executeSingleInsert(StmtExecutor executor, long jobId) {
+        beforeExec();
+        try {
+            execImpl(executor, jobId);
+            if (!checkStrictMode()) {
+                return;
+            }
+            onComplete();
+        } catch (Throwable t) {
+            onFail(t);
+            return;
+        } finally {
+            executor.updateProfile(true);
+            QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId());
+        }
+        afterExec(executor);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/BatchInsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
similarity index 87%
rename from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/BatchInsertIntoTableCommand.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
index 87ab7f73426..82b81473948 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/BatchInsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.trees.plans.commands;
+package org.apache.doris.nereids.trees.plans.commands.insert;
 
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
@@ -32,6 +32,8 @@ import org.apache.doris.nereids.trees.TreeNode;
 import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
 import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@@ -69,15 +71,7 @@ public class BatchInsertIntoTableCommand extends Command 
implements ForwardWithS
 
     @Override
     public Plan getExplainPlan(ConnectContext ctx) throws Exception {
-        if (!ctx.getSessionVariable().isEnableNereidsDML()) {
-            try {
-                ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
-            } catch (Exception e) {
-                throw new AnalysisException("failed to set fallback to 
original planner to true", e);
-            }
-            throw new AnalysisException("Nereids DML is disabled, will try to 
fall back to the original planner");
-        }
-        return InsertExecutor.normalizePlan(this.logicalQuery, 
InsertExecutor.getTargetTable(this.logicalQuery, ctx));
+        return InsertUtils.getPlanForExplain(ctx, this.logicalQuery);
     }
 
     @Override
@@ -103,10 +97,10 @@ public class BatchInsertIntoTableCommand extends Command 
implements ForwardWithS
         }
 
         PhysicalOlapTableSink<?> sink;
-        TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, 
ctx);
+        TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
         targetTableIf.readLock();
         try {
-            this.logicalQuery = (LogicalPlan) 
InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
+            this.logicalQuery = (LogicalPlan) 
InsertUtils.normalizePlan(logicalQuery, targetTableIf);
             LogicalPlanAdapter logicalPlanAdapter = new 
LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
             NereidsPlanner planner = new 
NereidsPlanner(ctx.getStatementContext());
             planner.plan(logicalPlanAdapter, 
ctx.getSessionVariable().toThrift());
@@ -147,14 +141,14 @@ public class BatchInsertIntoTableCommand extends Command 
implements ForwardWithS
             Optional<PhysicalUnion> union = planner.getPhysicalPlan()
                     
.<Set<PhysicalUnion>>collect(PhysicalUnion.class::isInstance).stream().findAny();
             if (union.isPresent()) {
-                InsertExecutor.executeBatchInsertTransaction(ctx, 
targetTable.getQualifiedDbName(),
+                InsertUtils.executeBatchInsertTransaction(ctx, 
targetTable.getQualifiedDbName(),
                         targetTable.getName(), targetSchema, 
union.get().getConstantExprsList());
                 return;
             }
             Optional<PhysicalOneRowRelation> oneRowRelation = 
planner.getPhysicalPlan()
                     
.<Set<PhysicalOneRowRelation>>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny();
             if (oneRowRelation.isPresent()) {
-                InsertExecutor.executeBatchInsertTransaction(ctx, 
targetTable.getQualifiedDbName(),
+                InsertUtils.executeBatchInsertTransaction(ctx, 
targetTable.getQualifiedDbName(),
                         targetTable.getName(), targetSchema, 
ImmutableList.of(oneRowRelation.get().getProjects()));
                 return;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
new file mode 100644
index 00000000000..4926d5486dc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/GroupCommitInserter.java
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.GroupCommitPlanner;
+import org.apache.doris.planner.OlapTableSink;
+import org.apache.doris.planner.UnionNode;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SqlModeHelper;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.transaction.TransactionStatus;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Handle group commit
+ */
+public class GroupCommitInserter {
+    public static final Logger LOG = 
LogManager.getLogger(GroupCommitInserter.class);
+
+    /**
+     * Handle group commit
+     */
+    public static boolean groupCommit(ConnectContext ctx, DataSink sink, 
PhysicalSink physicalSink) {
+        PhysicalOlapTableSink<?> olapSink = (PhysicalOlapTableSink<?>) 
physicalSink;
+        // TODO: implement group commit
+        if (canGroupCommit(ctx, sink, olapSink)) {
+            // handleGroupCommit(ctx, sink, physicalOlapTableSink);
+            // return;
+            throw new AnalysisException("group commit is not supported in 
Nereids now");
+        }
+        return false;
+    }
+
+    private static boolean canGroupCommit(ConnectContext ctx, DataSink sink,
+            PhysicalOlapTableSink<?> physicalOlapTableSink) {
+        if (!(sink instanceof OlapTableSink) || 
!ctx.getSessionVariable().isEnableInsertGroupCommit()
+                || ctx.getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
+            return false;
+        }
+        OlapTable targetTable = physicalOlapTableSink.getTargetTable();
+        return ctx.getSessionVariable().getSqlMode() != 
SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES
+                && !ctx.isTxnModel() && 
isGroupCommitAvailablePlan(physicalOlapTableSink)
+                && physicalOlapTableSink.getPartitionIds().isEmpty() && 
targetTable.getTableProperty()
+                .getUseSchemaLightChange() && !targetTable.getQualifiedDbName()
+                .equalsIgnoreCase(FeConstants.INTERNAL_DB_NAME);
+    }
+
+    private static boolean isGroupCommitAvailablePlan(PhysicalOlapTableSink<? 
extends Plan> sink) {
+        Plan child = sink.child();
+        if (child instanceof PhysicalDistribute) {
+            child = child.child(0);
+        }
+        return child instanceof OneRowRelation || (child instanceof 
PhysicalUnion && child.arity() == 0);
+    }
+
+    private void handleGroupCommit(ConnectContext ctx, DataSink sink,
+            PhysicalOlapTableSink<?> physicalOlapTableSink)
+            throws UserException, RpcException, TException, 
ExecutionException, InterruptedException {
+        // TODO we should refactor this to remove rely on UnionNode
+        List<InternalService.PDataRow> rows = new ArrayList<>();
+        List<List<Expr>> materializedConstExprLists = ((UnionNode) 
sink.getFragment()
+                .getPlanRoot()).getMaterializedConstExprLists();
+        int filterSize = 0;
+        for (Slot slot : physicalOlapTableSink.getOutput()) {
+            if (slot.getName().contains(Column.DELETE_SIGN)
+                    || slot.getName().contains(Column.VERSION_COL)) {
+                filterSize += 1;
+            }
+        }
+        for (List<Expr> list : materializedConstExprLists) {
+            rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize));
+        }
+        GroupCommitPlanner groupCommitPlanner = new 
GroupCommitPlanner(physicalOlapTableSink.getDatabase(),
+                physicalOlapTableSink.getTargetTable(), null, ctx.queryId(),
+                ConnectContext.get().getSessionVariable().getGroupCommit());
+        PGroupCommitInsertResponse response = 
groupCommitPlanner.executeGroupCommitInsert(ctx, rows);
+        TStatusCode code = 
TStatusCode.findByValue(response.getStatus().getStatusCode());
+        if (code == TStatusCode.DATA_QUALITY_ERROR) {
+            LOG.info("group commit insert failed. query id: {}, backend id: 
{}, status: {}, "
+                            + "schema version: {}", ctx.queryId(),
+                    groupCommitPlanner.getBackend(), response.getStatus(),
+                    
physicalOlapTableSink.getTargetTable().getBaseSchemaVersion());
+        } else if (code != TStatusCode.OK) {
+            String errMsg = "group commit insert failed. backend id: "
+                    + groupCommitPlanner.getBackend().getId() + ", status: "
+                    + response.getStatus();
+            ErrorReport.reportDdlException(errMsg, 
ErrorCode.ERR_FAILED_WHEN_INSERT);
+        }
+        TransactionStatus txnStatus = TransactionStatus.PREPARE;
+        String sb = "{'label':'" + response.getLabel() + "', 'status':'" + 
txnStatus.name()
+                + "', 'txnId':'" + response.getTxnId() + "'"
+                + "', 'optimizer':'" + "nereids" + "'"
+                + "}";
+        ctx.getState().setOk(response.getLoadedRows(), (int) 
response.getFilteredRows(), sb);
+        ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
+                physicalOlapTableSink.getDatabase().getFullName(), 
physicalOlapTableSink.getTargetTable().getName(),
+                txnStatus, response.getLoadedRows(), (int) 
response.getFilteredRows());
+        // update it, so that user can get loaded rows in fe.audit.log
+        ctx.updateReturnRows((int) response.getLoadedRows());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertCommandContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertCommandContext.java
new file mode 100644
index 00000000000..1b4e28117bc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertCommandContext.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+/**
+ * The context of insert command.
+ * You can add some fields or methods here if you need in derived classed
+ */
+public abstract class InsertCommandContext {
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
new file mode 100644
index 00000000000..06cd4275682
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.util.ProfileManager.ProfileType;
+import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.glue.LogicalPlanAdapter;
+import org.apache.doris.nereids.trees.plans.Explainable;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+
+import com.google.common.base.Preconditions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * insert into select command implementation
+ * insert into select command support the grammar: explain? insert into table 
columns? partitions? hints? query
+ * InsertIntoTableCommand is a command to represent insert the answer of a 
query into a table.
+ * class structure's:
+ * InsertIntoTableCommand(Query())
+ * ExplainCommand(Query())
+ */
+public class InsertIntoTableCommand extends Command implements 
ForwardWithSync, Explainable {
+
+    public static final Logger LOG = 
LogManager.getLogger(InsertIntoTableCommand.class);
+
+    private LogicalPlan logicalQuery;
+    private Optional<String> labelName;
+    /**
+     * When source it's from job scheduler,it will be set.
+     */
+    private long jobId;
+    private Optional<InsertCommandContext> insertCtx;
+
+    /**
+     * constructor
+     */
+    public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> 
labelName,
+            Optional<InsertCommandContext> insertCtx) {
+        super(PlanType.INSERT_INTO_TABLE_COMMAND);
+        this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery 
should not be null");
+        this.labelName = Objects.requireNonNull(labelName, "labelName should 
not be null");
+        this.insertCtx = insertCtx;
+    }
+
+    public Optional<String> getLabelName() {
+        return labelName;
+    }
+
+    public void setLabelName(Optional<String> labelName) {
+        this.labelName = labelName;
+    }
+
+    public void setJobId(long jobId) {
+        this.jobId = jobId;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        runInternal(ctx, executor);
+    }
+
+    public void runWithUpdateInfo(ConnectContext ctx, StmtExecutor executor,
+                                  LoadStatistic loadStatistic) throws 
Exception {
+        // TODO: add coordinator statistic
+        runInternal(ctx, executor);
+    }
+
+    private void runInternal(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        if (!ctx.getSessionVariable().isEnableNereidsDML()) {
+            try {
+                ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
+            } catch (Exception e) {
+                throw new AnalysisException("failed to set fallback to 
original planner to true", e);
+            }
+            throw new AnalysisException("Nereids DML is disabled, will try to 
fall back to the original planner");
+        }
+
+        TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
+        // check auth
+        if (!Env.getCurrentEnv().getAccessManager()
+                .checkTblPriv(ConnectContext.get(), 
targetTableIf.getDatabase().getCatalog().getName(),
+                        targetTableIf.getDatabase().getFullName(), 
targetTableIf.getName(),
+                        PrivPredicate.LOAD)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"LOAD",
+                    ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
+                    targetTableIf.getDatabase().getFullName() + "." + 
targetTableIf.getName());
+        }
+
+        AbstractInsertExecutor insertExecutor;
+        // should lock target table until we begin transaction.
+        targetTableIf.readLock();
+        try {
+            // 1. process inline table (default values, empty values)
+            this.logicalQuery = (LogicalPlan) 
InsertUtils.normalizePlan(logicalQuery, targetTableIf);
+
+            LogicalPlanAdapter logicalPlanAdapter = new 
LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
+            NereidsPlanner planner = new 
NereidsPlanner(ctx.getStatementContext());
+            planner.plan(logicalPlanAdapter, 
ctx.getSessionVariable().toThrift());
+            executor.setPlanner(planner);
+            executor.checkBlockRules();
+            if (ctx.getMysqlChannel() != null) {
+                ctx.getMysqlChannel().reset();
+            }
+
+            Optional<PhysicalOlapTableSink<?>> plan = 
(planner.getPhysicalPlan()
+                    
.<Set<PhysicalOlapTableSink<?>>>collect(PhysicalSink.class::isInstance)).stream()
+                    .findAny();
+            Preconditions.checkArgument(plan.isPresent(), "insert into command 
must contain target table");
+            PhysicalSink physicalSink = plan.get();
+            DataSink sink = planner.getFragments().get(0).getSink();
+            String label = this.labelName.orElse(String.format("label_%x_%x", 
ctx.queryId().hi, ctx.queryId().lo));
+
+            if (physicalSink instanceof PhysicalOlapTableSink) {
+                if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) {
+                    return;
+                }
+                OlapTable olapTable = (OlapTable) targetTableIf;
+                insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, 
planner, insertCtx);
+                boolean isEnableMemtableOnSinkNode =
+                        olapTable.getTableProperty().getUseSchemaLightChange()
+                                ? 
insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
+                                : false;
+                insertExecutor.getCoordinator().getQueryOptions()
+                        
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+            } else {
+                // TODO: support other table types
+                throw new AnalysisException("insert into command only support 
olap table");
+            }
+
+            insertExecutor.beginTransaction();
+            insertExecutor.finalizeSink(planner.getFragments().get(0), sink, 
physicalSink);
+        } finally {
+            targetTableIf.readUnlock();
+        }
+
+        executor.setProfileType(ProfileType.LOAD);
+        // We exposed @StmtExecutor#cancel as a unified entry point for 
statement interruption,
+        // so we need to set this here
+        executor.setCoord(insertExecutor.getCoordinator());
+        insertExecutor.executeSingleInsert(executor, jobId);
+    }
+
+    @Override
+    public Plan getExplainPlan(ConnectContext ctx) {
+        return InsertUtils.getPlanForExplain(ctx, this.logicalQuery);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitInsertIntoTableCommand(this, context);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
similarity index 90%
rename from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index c7a4ed4f9ce..fffdf06b54e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.trees.plans.commands;
+package org.apache.doris.nereids.trees.plans.commands.insert;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
@@ -34,6 +34,8 @@ import org.apache.doris.nereids.trees.TreeNode;
 import org.apache.doris.nereids.trees.plans.Explainable;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@@ -91,13 +93,13 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
             }
             throw new AnalysisException("Nereids DML is disabled, will try to 
fall back to the original planner");
         }
-        // insert overwrite only support
-        TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, 
ctx);
+
+        TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
         if (!(targetTableIf instanceof OlapTable)) {
             throw new AnalysisException("insert into overwrite only support 
OLAP table."
                     + " But current table type is " + targetTableIf.getType());
         }
-        this.logicalQuery = (LogicalPlan) 
InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
+        this.logicalQuery = (LogicalPlan) 
InsertUtils.normalizePlan(logicalQuery, targetTableIf);
 
         LogicalPlanAdapter logicalPlanAdapter = new 
LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
         NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
@@ -165,8 +167,9 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
                 sink.getDMLCommandType(),
                 (LogicalPlan) (sink.child(0)));
         // for overwrite situation, we disable auto create partition.
-        InsertIntoTableCommand insertCommand = new 
InsertIntoTableCommand(copySink, labelName);
-        insertCommand.setAllowAutoPartition(false);
+        OlapInsertCommandContext insertCtx = new OlapInsertCommandContext();
+        insertCtx.setAllowAutoPartition(false);
+        InsertIntoTableCommand insertCommand = new 
InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx));
         insertCommand.run(ctx, executor);
         if (ctx.getState().getStateType() == MysqlStateType.ERR) {
             String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
@@ -177,15 +180,7 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
 
     @Override
     public Plan getExplainPlan(ConnectContext ctx) {
-        if (!ctx.getSessionVariable().isEnableNereidsDML()) {
-            try {
-                ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
-            } catch (Exception e) {
-                throw new AnalysisException("failed to set fallback to 
original planner to true", e);
-            }
-            throw new AnalysisException("Nereids DML is disabled, will try to 
fall back to the original planner");
-        }
-        return InsertExecutor.normalizePlan(this.logicalQuery, 
InsertExecutor.getTargetTable(this.logicalQuery, ctx));
+        return InsertUtils.getPlanForExplain(ctx, this.logicalQuery);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
similarity index 54%
rename from 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 295478349cf..ab65b065e35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -15,11 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.nereids.trees.plans.commands;
+package org.apache.doris.nereids.trees.plans.commands.insert;
 
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.StatementBase;
-import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
@@ -27,15 +24,7 @@ import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.TableIf.TableType;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.load.EtlJobType;
-import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.analyzer.UnboundAlias;
 import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
 import org.apache.doris.nereids.analyzer.UnboundTableSink;
@@ -59,328 +48,33 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.util.RelationUtil;
 import org.apache.doris.nereids.util.TypeCoercionUtils;
-import org.apache.doris.planner.DataSink;
-import org.apache.doris.planner.DataStreamSink;
-import org.apache.doris.planner.ExchangeNode;
-import org.apache.doris.planner.OlapTableSink;
-import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.InsertStreamTxnExecutor;
-import org.apache.doris.qe.QeProcessorImpl;
-import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
-import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.service.FrontendOptions;
-import org.apache.doris.task.LoadEtlTask;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TMergeType;
-import org.apache.doris.thrift.TOlapTableLocationParam;
-import org.apache.doris.thrift.TPartitionType;
-import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TTxnParams;
-import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionEntry;
 import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
-import org.apache.doris.transaction.TransactionState.TxnCoordinator;
-import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionStatus;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
- * transaction wrapper for Nereids
+ * The helper class for insert operation.
  */
-public class InsertExecutor {
-
-    private static final Logger LOG = 
LogManager.getLogger(InsertExecutor.class);
-    private static final long INVALID_TXN_ID = -1L;
-
-    private final ConnectContext ctx;
-    private final Coordinator coordinator;
-    private final String labelName;
-    private final Database database;
-    private final Table table;
-    private final long createAt = System.currentTimeMillis();
-    private long loadedRows = 0;
-    private int filteredRows = 0;
-    private long txnId = INVALID_TXN_ID;
-    private TransactionStatus txnStatus = TransactionStatus.ABORTED;
-    private String errMsg = "";
-
-    /**
-     * constructor
-     */
-    public InsertExecutor(ConnectContext ctx, Database database, Table table,
-            String labelName, NereidsPlanner planner) {
-        this.ctx = ctx;
-        this.coordinator = new Coordinator(ctx, null, planner, 
ctx.getStatsErrorEstimator());
-        this.labelName = labelName;
-        this.database = database;
-        this.table = table;
-    }
-
-    public long getTxnId() {
-        return txnId;
-    }
-
-    /**
-     * begin transaction if necessary
-     */
-    public void beginTransaction() {
-        if (!(table instanceof OlapTable)) {
-            return;
-        }
-        try {
-            this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
-                    database.getId(), ImmutableList.of(table.getId()), 
labelName,
-                    new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
-                    LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
-        } catch (Exception e) {
-            throw new AnalysisException("begin transaction failed. " + 
e.getMessage(), e);
-        }
-    }
-
-    /**
-     * finalize sink to complete enough info for sink execution
-     */
-    public void finalizeSink(PlanFragment fragment, DataSink sink,
-            boolean isPartialUpdate, boolean isFromInsert, boolean 
allowAutoPartition) {
-        if (!(sink instanceof OlapTableSink)) {
-            return;
-        }
-        Preconditions.checkState(table instanceof OlapTable,
-                "sink is OlapTableSink, but table type is " + table.getType());
-        OlapTableSink olapTableSink = (OlapTableSink) sink;
-        boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict()
-                && isPartialUpdate
-                && isFromInsert;
-        try {
-            // TODO refactor this to avoid call legacy planner's function
-            olapTableSink.init(ctx.queryId(), txnId, database.getId(),
-                    ctx.getExecTimeout(),
-                    ctx.getSessionVariable().getSendBatchParallelism(),
-                    false,
-                    isStrictMode);
-            olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
-            if (!allowAutoPartition) {
-                olapTableSink.setAutoPartition(false);
-            }
-            // update
-
-            // set schema and partition info for tablet id shuffle exchange
-            if (fragment.getPlanRoot() instanceof ExchangeNode
-                    && fragment.getDataPartition().getType() == 
TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
-                DataStreamSink dataStreamSink = (DataStreamSink) 
(fragment.getChild(0).getSink());
-                Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), 
ConnectContext.get());
-                
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
-                        database.getId(), olapTableSink.getDstTable(), 
analyzer));
-                
dataStreamSink.setTabletSinkPartitionParam(olapTableSink.createPartition(
-                        database.getId(), olapTableSink.getDstTable(), 
analyzer));
-                
dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
-                List<TOlapTableLocationParam> locationParams = olapTableSink
-                        .createLocation(olapTableSink.getDstTable());
-                
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
-                dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
-            }
-        } catch (Exception e) {
-            throw new AnalysisException(e.getMessage(), e);
-        }
-        TransactionState state = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), 
txnId);
-        if (state == null) {
-            throw new AnalysisException("txn does not exist: " + txnId);
-        }
-        state.addTableIndexes((OlapTable) table);
-        if (isPartialUpdate) {
-            state.setSchemaForPartialUpdate((OlapTable) table);
-        }
-    }
-
-    /**
-     * execute insert txn for insert into select command.
-     */
-    public void executeSingleInsertTransaction(StmtExecutor executor, long 
jobId) {
-        String queryId = DebugUtil.printId(ctx.queryId());
-        LOG.info("start insert [{}] with query id {} and txn id {}", 
labelName, queryId, txnId);
-        Throwable throwable = null;
-
-        try {
-            
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
-            coordinator.setQueryType(TQueryType.LOAD);
-            
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
-            QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), 
executor.getOriginStmtInString(), coordinator);
-            QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
-            coordinator.exec();
-            int execTimeout = ctx.getExecTimeout();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("insert [{}] with query id {} execution timeout is 
{}", labelName, queryId, execTimeout);
-            }
-            boolean notTimeout = coordinator.join(execTimeout);
-            if (!coordinator.isDone()) {
-                coordinator.cancel();
-                if (notTimeout) {
-                    errMsg = coordinator.getExecStatus().getErrorMsg();
-                    ErrorReport.reportDdlException("there exists unhealthy 
backend. "
-                            + errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
-                } else {
-                    
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
-                }
-            }
-            if (!coordinator.getExecStatus().ok()) {
-                errMsg = coordinator.getExecStatus().getErrorMsg();
-                LOG.warn("insert [{}] with query id {} failed, {}", labelName, 
queryId, errMsg);
-                ErrorReport.reportDdlException(errMsg, 
ErrorCode.ERR_FAILED_WHEN_INSERT);
-            }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("insert [{}] with query id {} delta files is {}",
-                        labelName, queryId, coordinator.getDeltaUrls());
-            }
-            if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) 
!= null) {
-                loadedRows = 
Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
-            }
-            if 
(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
-                filteredRows = 
Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
-            }
-
-            // if in strict mode, insert will fail if there are filtered rows
-            if (ctx.getSessionVariable().getEnableInsertStrict()) {
-                if (filteredRows > 0) {
-                    ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
-                            "Insert has filtered data in strict mode, 
tracking_url=" + coordinator.getTrackingUrl());
-                    return;
-                }
-            }
-
-            if (table.getType() != TableType.OLAP && table.getType() != 
TableType.MATERIALIZED_VIEW) {
-                // no need to add load job.
-                // MySQL table is already being inserted.
-                ctx.getState().setOk(loadedRows, filteredRows, null);
-                return;
-            }
-
-            if (ctx.getState().getStateType() == MysqlStateType.ERR) {
-                try {
-                    String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
-                    Env.getCurrentGlobalTransactionMgr().abortTransaction(
-                            database.getId(), txnId,
-                            (errMsg == null ? "unknown reason" : errMsg));
-                } catch (Exception abortTxnException) {
-                    LOG.warn("errors when abort txn. {}", 
ctx.getQueryIdentifier(), abortTxnException);
-                }
-            } else if 
(Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
-                    database, Lists.newArrayList(table),
-                    txnId,
-                    TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
-                    ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
-                txnStatus = TransactionStatus.VISIBLE;
-            } else {
-                txnStatus = TransactionStatus.COMMITTED;
-            }
-
-        } catch (Throwable t) {
-            // if any throwable being thrown during insert operation, first we 
should abort this txn
-            LOG.warn("insert [{}] with query id {} failed", labelName, 
queryId, t);
-            if (txnId != INVALID_TXN_ID) {
-                try {
-                    Env.getCurrentGlobalTransactionMgr().abortTransaction(
-                            database.getId(), txnId,
-                            t.getMessage() == null ? "unknown reason" : 
t.getMessage());
-                } catch (Exception abortTxnException) {
-                    // just print a log if abort txn failed. This failure do 
not need to pass to user.
-                    // user only concern abort how txn failed.
-                    LOG.warn("insert [{}] with query id {} abort txn {} 
failed",
-                            labelName, queryId, txnId, abortTxnException);
-                }
-            }
-
-            if (!Config.using_old_load_usage_pattern) {
-                // if not using old load usage pattern, error will be returned 
directly to user
-                StringBuilder sb = new StringBuilder(t.getMessage());
-                if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
-                    sb.append(". url: ").append(coordinator.getTrackingUrl());
-                }
-                ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
sb.toString());
-                return;
-            }
-
-            /*
-             * If config 'using_old_load_usage_pattern' is true.
-             * Doris will return a label to user, and user can use this label 
to check load job's status,
-             * which exactly like the old insert stmt usage pattern.
-             */
-            throwable = t;
-        } finally {
-            executor.updateProfile(true);
-            QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId());
-        }
-
-        // Go here, which means:
-        // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
-        // 2. transaction failed but Config.using_old_load_usage_pattern is 
true.
-        // we will record the load job info for these 2 cases
-        try {
-            // the statement parsed by Nereids is saved at 
executor::parsedStmt.
-            StatementBase statement = executor.getParsedStmt();
-            UserIdentity userIdentity;
-            //if we use job scheduler, parse statement will not set user 
identity,so we need to get it from context
-            if (null == statement) {
-                userIdentity = ctx.getCurrentUserIdentity();
-            } else {
-                userIdentity = statement.getUserInfo();
-            }
-            EtlJobType etlJobType = EtlJobType.INSERT;
-            if (0 != jobId) {
-                etlJobType = EtlJobType.INSERT_JOB;
-            }
-            if (!Config.enable_nereids_load) {
-                // just record for loadv2 here
-                ctx.getEnv().getLoadManager()
-                        .recordFinishedLoadJob(labelName, txnId, 
database.getFullName(),
-                                table.getId(),
-                                etlJobType, createAt, throwable == null ? "" : 
throwable.getMessage(),
-                                coordinator.getTrackingUrl(), userIdentity, 
jobId);
-            }
-        } catch (MetaNotFoundException e) {
-            LOG.warn("Record info of insert load with error {}", 
e.getMessage(), e);
-            errMsg = "Record info of insert load with error " + e.getMessage();
-        }
-
-        // {'label':'my_label1', 'status':'visible', 'txnId':'123'}
-        // {'label':'my_label1', 'status':'visible', 'txnId':'123' 
'err':'error messages'}
-        StringBuilder sb = new StringBuilder();
-        sb.append("{'label':'").append(labelName).append("', 
'status':'").append(txnStatus.name());
-        sb.append("', 'txnId':'").append(txnId).append("'");
-        if (table.getType() == TableType.MATERIALIZED_VIEW) {
-            sb.append("', 'rows':'").append(loadedRows).append("'");
-        }
-        if (!Strings.isNullOrEmpty(errMsg)) {
-            sb.append(", 'err':'").append(errMsg).append("'");
-        }
-        sb.append("}");
-
-        ctx.getState().setOk(loadedRows, filteredRows, sb.toString());
-        // set insert result in connection context,
-        // so that user can use `show insert result` to get info of the last 
insert operation.
-        ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), 
table.getName(),
-                txnStatus, loadedRows, filteredRows);
-        // update it, so that user can get loaded rows in fe.audit.log
-        ctx.updateReturnRows((int) loadedRows);
-    }
+public class InsertUtils {
 
     /**
      * execute insert values in transaction.
@@ -670,7 +364,18 @@ public class InsertExecutor {
         }
     }
 
-    public Coordinator getCoordinator() {
-        return coordinator;
+    /**
+     * get plan for explain.
+     */
+    public static Plan getPlanForExplain(ConnectContext ctx, LogicalPlan 
logicalQuery) {
+        if (!ctx.getSessionVariable().isEnableNereidsDML()) {
+            try {
+                ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
+            } catch (Exception e) {
+                throw new AnalysisException("failed to set fallback to 
original planner to true", e);
+            }
+            throw new AnalysisException("Nereids DML is disabled, will try to 
fall back to the original planner");
+        }
+        return InsertUtils.normalizePlan(logicalQuery, 
InsertUtils.getTargetTable(logicalQuery, ctx));
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java
new file mode 100644
index 00000000000..23dd8d13d91
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertCommandContext.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+/**
+ * For Olap Table
+ */
+public class OlapInsertCommandContext extends InsertCommandContext {
+    private boolean allowAutoPartition = true;
+
+    public boolean isAllowAutoPartition() {
+        return allowAutoPartition;
+    }
+
+    public void setAllowAutoPartition(boolean allowAutoPartition) {
+        this.allowAutoPartition = allowAutoPartition;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
new file mode 100644
index 00000000000..b24426b1e87
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.DataStreamSink;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.OlapTableSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TOlapTableLocationParam;
+import org.apache.doris.thrift.TPartitionType;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionState;
+import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
+import org.apache.doris.transaction.TransactionState.TxnCoordinator;
+import org.apache.doris.transaction.TransactionState.TxnSourceType;
+import org.apache.doris.transaction.TransactionStatus;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Insert executor for olap table
+ */
+public class OlapInsertExecutor extends AbstractInsertExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(OlapInsertExecutor.class);
+    private static final long INVALID_TXN_ID = -1L;
+    private long txnId = INVALID_TXN_ID;
+    private TransactionStatus txnStatus = TransactionStatus.ABORTED;
+
+    /**
+     * constructor
+     */
+    public OlapInsertExecutor(ConnectContext ctx, Table table,
+            String labelName, NereidsPlanner planner, 
Optional<InsertCommandContext> insertCtx) {
+        super(ctx, table, labelName, planner, insertCtx);
+    }
+
+    public long getTxnId() {
+        return txnId;
+    }
+
+    @Override
+    public void beginTransaction() {
+        try {
+            this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
+                    database.getId(), ImmutableList.of(table.getId()), 
labelName,
+                    new TxnCoordinator(TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()),
+                    LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
+        } catch (Exception e) {
+            throw new AnalysisException("begin transaction failed. " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink) {
+        OlapTableSink olapTableSink = (OlapTableSink) sink;
+        PhysicalOlapTableSink physicalOlapTableSink = (PhysicalOlapTableSink) 
physicalSink;
+        OlapInsertCommandContext olapInsertCtx = (OlapInsertCommandContext) 
insertCtx.orElse(
+                new OlapInsertCommandContext());
+
+        boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict()
+                && physicalOlapTableSink.isPartialUpdate()
+                && physicalOlapTableSink.getDmlCommandType() == 
DMLCommandType.INSERT;
+        try {
+            // TODO refactor this to avoid call legacy planner's function
+            int timeout = ctx.getExecTimeout();
+            olapTableSink.init(ctx.queryId(), txnId, database.getId(),
+                    timeout,
+                    ctx.getSessionVariable().getSendBatchParallelism(),
+                    false,
+                    isStrictMode);
+            olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
+            if (!olapInsertCtx.isAllowAutoPartition()) {
+                olapTableSink.setAutoPartition(false);
+            }
+            // update
+
+            // set schema and partition info for tablet id shuffle exchange
+            if (fragment.getPlanRoot() instanceof ExchangeNode
+                    && fragment.getDataPartition().getType() == 
TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
+                DataStreamSink dataStreamSink = (DataStreamSink) 
(fragment.getChild(0).getSink());
+                Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), 
ConnectContext.get());
+                
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
+                        database.getId(), olapTableSink.getDstTable(), 
analyzer));
+                
dataStreamSink.setTabletSinkPartitionParam(olapTableSink.createPartition(
+                        database.getId(), olapTableSink.getDstTable(), 
analyzer));
+                
dataStreamSink.setTabletSinkTupleDesc(olapTableSink.getTupleDescriptor());
+                List<TOlapTableLocationParam> locationParams = olapTableSink
+                        .createLocation(olapTableSink.getDstTable());
+                
dataStreamSink.setTabletSinkLocationParam(locationParams.get(0));
+                dataStreamSink.setTabletSinkTxnId(olapTableSink.getTxnId());
+            }
+        } catch (Exception e) {
+            throw new AnalysisException(e.getMessage(), e);
+        }
+        TransactionState state = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), 
txnId);
+        if (state == null) {
+            throw new AnalysisException("txn does not exist: " + txnId);
+        }
+        state.addTableIndexes((OlapTable) table);
+        if (physicalOlapTableSink.isPartialUpdate()) {
+            state.setSchemaForPartialUpdate((OlapTable) table);
+        }
+    }
+
+    @Override
+    protected void beforeExec() {
+        String queryId = DebugUtil.printId(ctx.queryId());
+        LOG.info("start insert [{}] with query id {} and txn id {}", 
labelName, queryId, txnId);
+    }
+
+    @Override
+    protected void onComplete() throws UserException {
+        if (ctx.getState().getStateType() == MysqlStateType.ERR) {
+            try {
+                String errMsg = 
Strings.emptyToNull(ctx.getState().getErrorMessage());
+                Env.getCurrentGlobalTransactionMgr().abortTransaction(
+                        database.getId(), txnId,
+                        (errMsg == null ? "unknown reason" : errMsg));
+            } catch (Exception abortTxnException) {
+                LOG.warn("errors when abort txn. {}", 
ctx.getQueryIdentifier(), abortTxnException);
+            }
+        } else if 
(Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
+                database, Lists.newArrayList((Table) table),
+                txnId,
+                TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
+                ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
+            txnStatus = TransactionStatus.VISIBLE;
+        } else {
+            txnStatus = TransactionStatus.COMMITTED;
+        }
+    }
+
+    @Override
+    protected void onFail(Throwable t) {
+        errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
+        String queryId = DebugUtil.printId(ctx.queryId());
+        // if any throwable being thrown during insert operation, first we 
should abort this txn
+        LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
+        if (txnId != INVALID_TXN_ID) {
+            try {
+                Env.getCurrentGlobalTransactionMgr().abortTransaction(
+                        database.getId(), txnId, errMsg);
+            } catch (Exception abortTxnException) {
+                // just print a log if abort txn failed. This failure do not 
need to pass to user.
+                // user only concern abort how txn failed.
+                LOG.warn("insert [{}] with query id {} abort txn {} failed",
+                        labelName, queryId, txnId, abortTxnException);
+            }
+        }
+
+        StringBuilder sb = new StringBuilder(t.getMessage());
+        if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
+            sb.append(". url: ").append(coordinator.getTrackingUrl());
+        }
+        ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
+    }
+
+    @Override
+    protected void afterExec(StmtExecutor executor) {
+        // Go here, which means:
+        // 1. transaction is finished successfully (COMMITTED or VISIBLE), or
+        // 2. transaction failed but Config.using_old_load_usage_pattern is 
true.
+        // we will record the load job info for these 2 cases
+        try {
+            // the statement parsed by Nereids is saved at 
executor::parsedStmt.
+            StatementBase statement = executor.getParsedStmt();
+            UserIdentity userIdentity;
+            // if we use job scheduler, parse statement will not set user 
identity,so we need to get it from context
+            if (null == statement) {
+                userIdentity = ctx.getCurrentUserIdentity();
+            } else {
+                userIdentity = statement.getUserInfo();
+            }
+            EtlJobType etlJobType = EtlJobType.INSERT;
+            if (0 != jobId) {
+                etlJobType = EtlJobType.INSERT_JOB;
+            }
+            if (!Config.enable_nereids_load) {
+                // just record for loadv2 here
+                ctx.getEnv().getLoadManager()
+                        .recordFinishedLoadJob(labelName, txnId, 
database.getFullName(),
+                                table.getId(),
+                                etlJobType, createTime, errMsg,
+                                coordinator.getTrackingUrl(), userIdentity, 
jobId);
+            }
+        } catch (MetaNotFoundException e) {
+            LOG.warn("Record info of insert load with error {}", 
e.getMessage(), e);
+            errMsg = "Record info of insert load with error " + e.getMessage();
+        }
+
+        // {'label':'my_label1', 'status':'visible', 'txnId':'123'}
+        // {'label':'my_label1', 'status':'visible', 'txnId':'123' 
'err':'error messages'}
+        StringBuilder sb = new StringBuilder();
+        sb.append("{'label':'").append(labelName).append("', 
'status':'").append(txnStatus.name());
+        sb.append("', 'txnId':'").append(txnId).append("'");
+        if (table.getType() == TableType.MATERIALIZED_VIEW) {
+            sb.append("', 'rows':'").append(loadedRows).append("'");
+        }
+        if (!Strings.isNullOrEmpty(errMsg)) {
+            sb.append(", 'err':'").append(errMsg).append("'");
+        }
+        sb.append("}");
+
+        ctx.getState().setOk(loadedRows, filteredRows, sb.toString());
+        // set insert result in connection context,
+        // so that user can use `show insert result` to get info of the last 
insert operation.
+        ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), 
table.getName(),
+                txnStatus, loadedRows, filteredRows);
+        // update it, so that user can get loaded rows in fe.audit.log
+        ctx.updateReturnRows((int) loadedRows);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index 9626942e10b..6bcc41e39e4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -19,7 +19,6 @@ package org.apache.doris.nereids.trees.plans.visitor;
 
 import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
-import 
org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.CallCommand;
 import org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
 import org.apache.doris.nereids.trees.plans.commands.Command;
@@ -34,8 +33,6 @@ import 
org.apache.doris.nereids.trees.plans.commands.DropMTMVCommand;
 import org.apache.doris.nereids.trees.plans.commands.DropProcedureCommand;
 import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
 import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
-import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
-import 
org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 import org.apache.doris.nereids.trees.plans.commands.PauseMTMVCommand;
 import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
@@ -44,6 +41,9 @@ import 
org.apache.doris.nereids.trees.plans.commands.ShowConstraintsCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.ShowCreateProcedureCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.ShowProcedureStatusCommand;
 import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
 
 /** CommandVisitor. */
 public interface CommandVisitor<R, C> {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index b6a8c882fae..c457a5feccd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -128,13 +128,13 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.minidump.MinidumpUtils;
 import org.apache.doris.nereids.parser.NereidsParser;
 import 
org.apache.doris.nereids.rules.exploration.mv.InitMaterializationContextHook;
-import 
org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.Command;
 import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.Forward;
-import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
-import 
org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
 import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.OlapScanNode;
@@ -2137,22 +2137,12 @@ public class StmtExecutor {
                     LOG.warn("errors when abort txn", abortTxnException);
                 }
 
-                if (!Config.using_old_load_usage_pattern) {
-                    // if not using old load usage pattern, error will be 
returned directly to user
-                    StringBuilder sb = new StringBuilder(t.getMessage());
-                    if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
-                        sb.append(". url: " + coord.getTrackingUrl());
-                    }
-                    context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
sb.toString());
-                    return;
+                StringBuilder sb = new StringBuilder(t.getMessage());
+                if (!Strings.isNullOrEmpty(coord.getTrackingUrl())) {
+                    sb.append(". url: " + coord.getTrackingUrl());
                 }
-
-                /*
-                 * If config 'using_old_load_usage_pattern' is true.
-                 * Doris will return a label to user, and user can use this 
label to check load job's status,
-                 * which exactly like the old insert stmt usage pattern.
-                 */
-                throwable = t;
+                context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
sb.toString());
+                return;
             } finally {
                 if (coord != null) {
                     coord.close();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
index 58212c2d3ba..7bcbecafb02 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/ReadLockTest.java
@@ -24,7 +24,7 @@ import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.properties.PhysicalProperties;
-import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 
 import org.junit.jupiter.api.Assertions;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
index 14ba32d61be..7613a2d284b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
@@ -51,8 +51,6 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 
 public class HmsQueryCacheTest extends AnalyzeCheckTestBase {
     private static final String HMS_CATALOG = "hms_ctl";
@@ -119,7 +117,6 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
         Deencapsulation.setField(db, "initialized", true);
 
         Deencapsulation.setField(tbl, "objectCreated", true);
-        Deencapsulation.setField(tbl, "rwLock", new 
ReentrantReadWriteLock(true));
         Deencapsulation.setField(tbl, "schemaUpdateTime", NOW);
         Deencapsulation.setField(tbl, "eventUpdateTime", 0);
         new Expectations(tbl) {
@@ -167,7 +164,6 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
         };
 
         Deencapsulation.setField(tbl2, "objectCreated", true);
-        Deencapsulation.setField(tbl2, "rwLock", new 
ReentrantReadWriteLock(true));
         Deencapsulation.setField(tbl2, "schemaUpdateTime", NOW);
         Deencapsulation.setField(tbl2, "eventUpdateTime", 0);
         new Expectations(tbl2) {
@@ -215,7 +211,6 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
         };
 
         Deencapsulation.setField(view1, "objectCreated", true);
-        Deencapsulation.setField(view1, "rwLock", new 
ReentrantReadWriteLock(true));
 
         new Expectations(view1) {
             {
@@ -270,7 +265,6 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
         };
 
         Deencapsulation.setField(view2, "objectCreated", true);
-        Deencapsulation.setField(view2, "rwLock", new 
ReentrantReadWriteLock(true));
         new Expectations(view2) {
             {
                 view2.getId();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to