This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 677b3424e3 [feature](Load)(step1)support nereids load, add load 
grammar (#23485)
677b3424e3 is described below

commit 677b3424e3b24793a8cd2af54db9e1cc2f8b933a
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Wed Sep 20 21:12:23 2023 +0800

    [feature](Load)(step1)support nereids load, add load grammar (#23485)
    
    support nereids load grammar.
    
    we will convert the broker load stmt to insert into clause:
    1. rename broker load to bulk load.
    2. add load grammar to nereids optimizer.
    3. convert to insert into clause with table value function.
    
    https://github.com/apache/doris/issues/24221
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  86 ++-
 .../java/org/apache/doris/common/FeConstants.java  |   2 +
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  89 ++-
 .../apache/doris/nereids/trees/plans/PlanType.java |   5 +-
 .../nereids/trees/plans/commands/LoadCommand.java  | 495 ++++++++++++++++
 .../plans/commands/info/BulkLoadDataDesc.java      | 333 +++++++++++
 .../trees/plans/commands/info/BulkStorageDesc.java | 123 ++++
 .../trees/plans/visitor/CommandVisitor.java        |   5 +
 .../ExternalFileTableValuedFunction.java           |  12 +-
 .../doris/analysis/BulkLoadDataDescTest.java       | 658 +++++++++++++++++++++
 .../apache/doris/analysis/S3TvfLoadStmtTest.java   |  36 +-
 11 files changed, 1818 insertions(+), 26 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 335576660f..b0a11bcb28 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -63,6 +63,19 @@ statement
         (PARTITION partition=identifierList)?
         (USING relation (COMMA relation)*)
         whereClause                                                    #delete
+    | LOAD LABEL lableName=identifier
+        LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+        (withRemoteStorageSystem)?
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?                                                 #load
+    | LOAD LABEL lableName=identifier
+        LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN
+        resourceDesc
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?                                                 
#resourceLoad
+    | LOAD mysqlDataDesc
+        (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)?
+        (commentSpec)?                                                 
#mysqlLoad
     | EXPORT TABLE tableName=multipartIdentifier
         (PARTITION partition=identifierList)?
         (whereClause)?
@@ -71,7 +84,29 @@ statement
         (withRemoteStorageSystem)?                                     #export
     ;
 
-
+dataDesc
+    : ((WITH)? mergeType)? DATA INFILE LEFT_PAREN filePaths+=STRING_LITERAL 
(COMMA filePath+=STRING_LITERAL)* RIGHT_PAREN
+        INTO TABLE tableName=multipartIdentifier
+        (PARTITION partition=identifierList)?
+        (COLUMNS TERMINATED BY comma=STRING_LITERAL)?
+        (LINES TERMINATED BY separator=STRING_LITERAL)?
+        (FORMAT AS format=identifier)?
+        (columns=identifierList)?
+        (columnsFromPath=colFromPath)?
+        (columnMapping=colMappingList)?
+        (preFilter=preFilterClause)?
+        (where=whereClause)?
+        (deleteOn=deleteOnClause)?
+        (sequenceColumn=sequenceColClause)?
+        (propertyClause)?
+    | ((WITH)? mergeType)? DATA FROM TABLE tableName=multipartIdentifier
+        INTO TABLE tableName=multipartIdentifier
+        (PARTITION partition=identifierList)?
+        (columnMapping=colMappingList)?
+        (where=whereClause)?
+        (deleteOn=deleteOnClause)?
+        (propertyClause)?
+    ;
 
 // -----------------Command accessories-----------------
 
@@ -101,6 +136,36 @@ planType
     | ALL // default type
     ;
 
+mergeType
+    : APPEND
+    | DELETE
+    | MERGE
+    ;
+
+preFilterClause
+    : PRECEDING FILTER expression
+    ;
+
+deleteOnClause
+    : DELETE ON expression
+    ;
+
+sequenceColClause
+    : ORDER BY identifier
+    ;
+
+colFromPath
+    : COLUMNS FROM PATH AS identifierList
+    ;
+
+colMappingList
+    : SET LEFT_PAREN mappingSet+=mappingExpr (COMMA mappingSet+=mappingExpr)* 
RIGHT_PAREN
+    ;
+
+mappingExpr
+    : (mappingCol=identifier EQ expression)
+    ;
+
 withRemoteStorageSystem
     : WITH S3 LEFT_PAREN
         brokerProperties=propertyItemList
@@ -117,6 +182,25 @@ withRemoteStorageSystem
         RIGHT_PAREN)?
     ;
 
+resourceDesc
+    : WITH RESOURCE resourceName=identifierOrText (LEFT_PAREN propertyItemList 
RIGHT_PAREN)?
+    ;
+
+mysqlDataDesc
+    : DATA (LOCAL booleanValue)?
+        INFILE filePath=STRING_LITERAL
+        INTO TABLE tableName=multipartIdentifier
+        (PARTITION partition=identifierList)?
+        (COLUMNS TERMINATED BY comma=STRING_LITERAL)?
+        (LINES TERMINATED BY separator=STRING_LITERAL)?
+        (skipLines)?
+        (columns=identifierList)?
+        (colMappingList)?
+        (propertyClause)?
+    ;
+
+skipLines : IGNORE lines=INTEGER_VALUE LINES | IGNORE lines=INTEGER_VALUE ROWS 
;
+
 //  -----------------Query-----------------
 // add queryOrganization for parse (q1) union (q2) union (q3) order by keys, 
otherwise 'order' will be recognized to be
 // identifier.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 0845d593e2..ecd0c2f4fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -49,6 +49,8 @@ public class FeConstants {
 
     // set to true to skip some step when running FE unit test
     public static boolean runningUnitTest = false;
+    // use to set some mocked values for FE unit test
+    public static Object unitTestConstant = null;
 
     // set to false to disable internal schema db
     public static boolean enableInternalSchemaDb = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index a9b50c8bf1..75d7c788c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -20,13 +20,13 @@ package org.apache.doris.nereids.parser;
 import org.apache.doris.analysis.ArithmeticExpr.Operator;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.StorageBackend;
-import org.apache.doris.analysis.StorageBackend.StorageType;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.nereids.DorisParser;
 import org.apache.doris.nereids.DorisParser.AggClauseContext;
 import org.apache.doris.nereids.DorisParser.AliasQueryContext;
@@ -291,7 +291,10 @@ import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
 import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
 import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
 import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.DefaultValue;
@@ -330,6 +333,7 @@ import org.apache.doris.nereids.types.StructField;
 import org.apache.doris.nereids.types.StructType;
 import org.apache.doris.nereids.types.coercion.CharacterType;
 import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
 import org.apache.doris.policy.FilterType;
 import org.apache.doris.policy.PolicyTypeEnum;
 import org.apache.doris.qe.ConnectContext;
@@ -351,6 +355,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -464,6 +469,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
 
     @Override
     public LogicalPlan visitExport(ExportContext ctx) {
+        // TODO: replace old class name like ExportStmt, BrokerDesc, Expr with 
new nereid class name
         List<String> tableName = visitMultipartIdentifier(ctx.tableName);
         List<String> partitions = ctx.partition == null ? ImmutableList.of() : 
visitIdentifierList(ctx.partition);
 
@@ -515,7 +521,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         } else if (ctx.HDFS() != null) {
             brokerDesc = new BrokerDesc("HDFS", 
StorageBackend.StorageType.HDFS, brokerPropertiesMap);
         } else if (ctx.LOCAL() != null) {
-            brokerDesc = new BrokerDesc("HDFS", StorageType.LOCAL, 
brokerPropertiesMap);
+            brokerDesc = new BrokerDesc("HDFS", 
StorageBackend.StorageType.LOCAL, brokerPropertiesMap);
         } else if (ctx.BROKER() != null) {
             brokerDesc = new BrokerDesc(visitIdentifierOrText(ctx.brokerName), 
brokerPropertiesMap);
         }
@@ -541,6 +547,85 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         return logicalPlans;
     }
 
+    /**
+     * Visit load-statements.
+     */
+    @Override
+    public LogicalPlan visitLoad(DorisParser.LoadContext ctx) {
+
+        BulkStorageDesc bulkDesc = null;
+        if (ctx.withRemoteStorageSystem() != null) {
+            Map<String, String> bulkProperties =
+                    new 
HashMap<>(visitPropertyItemList(ctx.withRemoteStorageSystem().brokerProperties));
+            if (ctx.withRemoteStorageSystem().S3() != null) {
+                bulkDesc = new BulkStorageDesc("S3", 
BulkStorageDesc.StorageType.S3, bulkProperties);
+            } else if (ctx.withRemoteStorageSystem().HDFS() != null) {
+                bulkDesc = new BulkStorageDesc("HDFS", 
BulkStorageDesc.StorageType.HDFS, bulkProperties);
+            } else if (ctx.withRemoteStorageSystem().LOCAL() != null) {
+                bulkDesc = new BulkStorageDesc("LOCAL_HDFS", 
BulkStorageDesc.StorageType.LOCAL, bulkProperties);
+            } else if (ctx.withRemoteStorageSystem().BROKER() != null
+                    && 
ctx.withRemoteStorageSystem().identifierOrText().getText() != null) {
+                bulkDesc = new 
BulkStorageDesc(ctx.withRemoteStorageSystem().identifierOrText().getText(),
+                        bulkProperties);
+            }
+        }
+        ImmutableList.Builder<BulkLoadDataDesc> dataDescriptions = new 
ImmutableList.Builder<>();
+        for (DorisParser.DataDescContext ddc : ctx.dataDescs) {
+            List<String> tableName = 
RelationUtil.getQualifierName(ConnectContext.get(),
+                    visitMultipartIdentifier(ddc.tableName));
+            List<String> colNames = (ddc.columns == null ? ImmutableList.of() 
: visitIdentifierList(ddc.columns));
+            List<String> columnsFromPath = (ddc.columnsFromPath == null ? 
ImmutableList.of()
+                        : 
visitIdentifierList(ddc.columnsFromPath.identifierList()));
+            List<String> partitions = ddc.partition == null ? 
ImmutableList.of() : visitIdentifierList(ddc.partition);
+            // TODO: multi location
+            List<String> multiFilePaths = new ArrayList<>();
+            for (Token filePath : ddc.filePaths) {
+                multiFilePaths.add(filePath.getText().substring(1, 
filePath.getText().length() - 1));
+            }
+            List<String> filePaths = ddc.filePath == null ? ImmutableList.of() 
: multiFilePaths;
+            Map<String, Expression> colMappings;
+            if (ddc.columnMapping == null) {
+                colMappings = ImmutableMap.of();
+            } else {
+                colMappings = new HashMap<>();
+                for (DorisParser.MappingExprContext mappingExpr : 
ddc.columnMapping.mappingSet) {
+                    colMappings.put(mappingExpr.mappingCol.getText(), 
getExpression(mappingExpr.expression()));
+                }
+            }
+
+            LoadTask.MergeType mergeType = ddc.mergeType() == null ? 
LoadTask.MergeType.APPEND
+                        : 
LoadTask.MergeType.valueOf(ddc.mergeType().getText());
+
+            Optional<String> fileFormat = ddc.format == null ? 
Optional.empty() : Optional.of(ddc.format.getText());
+            Optional<String> separator = ddc.separator == null ? 
Optional.empty() : Optional.of(ddc.separator.getText()
+                        .substring(1, ddc.separator.getText().length() - 1));
+            Optional<String> comma = ddc.comma == null ? Optional.empty() : 
Optional.of(ddc.comma.getText()
+                        .substring(1, ddc.comma.getText().length() - 1));
+            Map<String, String> dataProperties = ddc.propertyClause() == null 
? new HashMap<>()
+                        : visitPropertyClause(ddc.propertyClause());
+            dataDescriptions.add(new BulkLoadDataDesc(
+                    tableName,
+                    partitions,
+                    filePaths,
+                    colNames,
+                    columnsFromPath,
+                    colMappings,
+                    new BulkLoadDataDesc.FileFormatDesc(separator, comma, 
fileFormat),
+                    false,
+                    ddc.preFilter == null ? Optional.empty() : 
Optional.of(getExpression(ddc.preFilter.expression())),
+                    ddc.where == null ? Optional.empty() : 
Optional.of(getExpression(ddc.where.booleanExpression())),
+                    mergeType,
+                    ddc.deleteOn == null ? Optional.empty() : 
Optional.of(getExpression(ddc.deleteOn.expression())),
+                    ddc.sequenceColumn == null ? Optional.empty()
+                            : 
Optional.of(ddc.sequenceColumn.identifier().getText()), dataProperties));
+        }
+        String labelName = ctx.lableName.getText();
+        Map<String, String> properties = visitPropertyItemList(ctx.properties);
+        String commentSpec = ctx.commentSpec() == null ? "" : 
ctx.commentSpec().STRING_LITERAL().getText();
+        String comment = escapeBackSlash(commentSpec.substring(1, 
commentSpec.length() - 1));
+        return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc, 
properties, comment);
+    }
+
     /* 
********************************************************************************************
      * Plan parsing
      * 
********************************************************************************************
 */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 31c3641620..c1ed61ce1c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -119,8 +119,9 @@ public enum PlanType {
     CREATE_TABLE_COMMAND,
     DELETE_COMMAND,
     EXPLAIN_COMMAND,
+    EXPORT_COMMAND,
     INSERT_INTO_TABLE_COMMAND,
+    LOAD_COMMAND,
     SELECT_INTO_OUTFILE_COMMAND,
-    UPDATE_COMMAND,
-    EXPORT_COMMAND
+    UPDATE_COMMAND
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
new file mode 100644
index 0000000000..f02c5bd2d8
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -0,0 +1,495 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.NereidsException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.If;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.QueryStateException;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.HdfsTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * load OLAP table data from external bulk file
+ */
+public class LoadCommand extends Command implements ForwardWithSync {
+
+    public static final Logger LOG = LogManager.getLogger(LoadCommand.class);
+
+    private final String labelName;
+    private final BulkStorageDesc bulkStorageDesc;
+    private final List<BulkLoadDataDesc> sourceInfos;
+    private final Map<String, String> properties;
+    private final String comment;
+    private final List<LogicalPlan> plans = new ArrayList<>();
+    private Profile profile;
+
+    /**
+     * constructor of ExportCommand
+     */
+    public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, 
BulkStorageDesc bulkStorageDesc,
+                       Map<String, String> properties, String comment) {
+        super(PlanType.LOAD_COMMAND);
+        this.labelName = Objects.requireNonNull(labelName.trim(), "labelName 
should not null");
+        this.sourceInfos = 
Objects.requireNonNull(ImmutableList.copyOf(sourceInfos), "sourceInfos should 
not null");
+        this.properties = 
Objects.requireNonNull(ImmutableMap.copyOf(properties), "properties should not 
null");
+        this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, 
"bulkStorageDesc should not null");
+        this.comment = Objects.requireNonNull(comment, "comment should not 
null");
+    }
+
+    /**
+     * for test print
+     *
+     * @param ctx context
+     * @return parsed insert into plan
+     */
+    @VisibleForTesting
+    public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws 
AnalysisException {
+        List<LogicalPlan> plans = new ArrayList<>();
+        for (BulkLoadDataDesc dataDesc : sourceInfos) {
+            plans.add(completeQueryPlan(ctx, dataDesc));
+        }
+        return plans;
+    }
+
+    @Override
+    public void run(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        // TODO: begin txn form multi insert sql
+        /* this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+          profile.getSummaryProfile().setQueryBeginTime();
+          for (BulkLoadDataDesc dataDesc : sourceInfos) {
+               plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, 
dataDesc), Optional.of(labelName), false));
+          }
+          profile.getSummaryProfile().setQueryPlanFinishTime();
+         * executeInsertStmtPlan(ctx, executor, plans);  */
+        throw new AnalysisException("Fallback to legacy planner temporary.");
+    }
+
+    private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc 
dataDesc)
+            throws AnalysisException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("nereids load stmt before conversion: {}", 
dataDesc::toSql);
+        }
+        // 1. build source projects plan (select col1,col2... from tvf where 
prefilter)
+        Map<String, String> tvfProperties = getTvfProperties(dataDesc, 
bulkStorageDesc);
+        LogicalPlan tvfLogicalPlan = new 
LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties));
+        tvfLogicalPlan = buildTvfQueryPlan(dataDesc, tvfProperties, 
tvfLogicalPlan);
+
+        if (!(tvfLogicalPlan instanceof LogicalProject)) {
+            throw new AnalysisException("Fail to build TVF query, TVF query 
should be LogicalProject");
+        }
+        List<NamedExpression> tvfProjects = ((LogicalProject<?>) 
tvfLogicalPlan).getProjects();
+        // tvfProjects may be '*' or 'col1,col2,...'
+        if (tvfProjects.isEmpty()) {
+            throw new AnalysisException("Fail to build TVF query, parsed TVF 
select list requires not null");
+        }
+        boolean scanAllTvfCol = (tvfProjects.get(0) instanceof UnboundStar);
+
+        OlapTable olapTable = getOlapTable(ctx, dataDesc);
+        List<Column> olapSchema = olapTable.getBaseSchema();
+        // map column index to mapping expr
+        Map<String, Expression> mappingExpressions = 
dataDesc.getColumnMappings();
+        // 2. build sink where
+        Set<Expression> conjuncts = new HashSet<>();
+        if (dataDesc.getWhereExpr().isPresent()) {
+            Set<Expression> whereParts = 
ExpressionUtils.extractConjunctionToSet(dataDesc.getWhereExpr().get());
+            for (Expression wherePart : whereParts) {
+                if (!(wherePart instanceof ComparisonPredicate)) {
+                    throw new AnalysisException("WHERE clause must be 
comparison expression");
+                }
+                ComparisonPredicate comparison = ((ComparisonPredicate) 
wherePart);
+                if (!(comparison.left() instanceof UnboundSlot)) {
+                    throw new AnalysisException("Invalid predicate column " + 
comparison.left().toSql());
+                }
+                conjuncts.add(comparison.rewriteUp(e -> {
+                    if (!(e instanceof UnboundSlot)) {
+                        return e;
+                    }
+                    UnboundSlot slot = (UnboundSlot) e;
+                    String colName = getUnquotedName(slot);
+                    return mappingExpressions.getOrDefault(colName, e);
+                }));
+            }
+        }
+
+        if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties) 
&& !conjuncts.isEmpty()) {
+            throw new AnalysisException("Required property 'csv_schema' for 
csv file, "
+                    + "when no column list specified and use WHERE");
+        }
+        tvfLogicalPlan = new LogicalFilter<>(conjuncts, tvfLogicalPlan);
+
+        // 3. build sink project
+        List<String> sinkCols = new ArrayList<>();
+        List<NamedExpression> selectLists = new ArrayList<>();
+        List<String> olapColumns = 
olapSchema.stream().map(Column::getDisplayName).collect(Collectors.toList());
+        if (!scanAllTvfCol) {
+            int numSinkCol = Math.min(tvfProjects.size(), olapColumns.size());
+            // if not scan all tvf column, try to treat each tvfColumn as 
olapColumn
+            for (int i = 0; i < numSinkCol; i++) {
+                UnboundSlot sourceCol = (UnboundSlot) tvfProjects.get(i);
+                // check sourceCol is slot and check olapColumn beyond index.
+                String olapColumn = olapColumns.get(i);
+                fillSinkBySourceCols(mappingExpressions, olapColumn,
+                        sourceCol, sinkCols, selectLists);
+            }
+            fillDeleteOnColumn(dataDesc, olapTable, sinkCols, selectLists, 
Column.DELETE_SIGN);
+        } else {
+            for (String olapColumn : olapColumns) {
+                if (olapColumn.equalsIgnoreCase(Column.VERSION_COL)
+                        || olapColumn.equalsIgnoreCase(Column.SEQUENCE_COL)) {
+                    continue;
+                }
+                if (olapColumn.equalsIgnoreCase(Column.DELETE_SIGN)) {
+                    fillDeleteOnColumn(dataDesc, olapTable, sinkCols, 
selectLists, olapColumn);
+                    continue;
+                }
+                fillSinkBySourceCols(mappingExpressions, olapColumn, new 
UnboundSlot(olapColumn),
+                        sinkCols, selectLists);
+            }
+        }
+        if (sinkCols.isEmpty() && selectLists.isEmpty()) {
+            // build 'insert into tgt_tbl select * from src_tbl'
+            selectLists.add(new UnboundStar(new ArrayList<>()));
+        }
+        for (String columnFromPath : dataDesc.getColumnsFromPath()) {
+            sinkCols.add(columnFromPath);
+            // columnFromPath will be parsed by BE, put columns as placeholder.
+            selectLists.add(new UnboundSlot(columnFromPath));
+        }
+
+        tvfLogicalPlan = new LogicalProject<>(selectLists, tvfLogicalPlan);
+        checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, selectLists);
+        boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite()
+                && sinkCols.size() < olapTable.getColumns().size();
+        return new UnboundOlapTableSink<>(dataDesc.getNameParts(), sinkCols, 
ImmutableList.of(),
+                dataDesc.getPartitionNames(), isPartialUpdate, tvfLogicalPlan);
+    }
+
+    private static void fillDeleteOnColumn(BulkLoadDataDesc dataDesc, 
OlapTable olapTable,
+                                           List<String> sinkCols,
+                                           List<NamedExpression> selectLists,
+                                           String olapColumn) throws 
AnalysisException {
+        if (olapTable.hasDeleteSign() && 
dataDesc.getDeleteCondition().isPresent()) {
+            checkDeleteOnConditions(dataDesc.getMergeType(), 
dataDesc.getDeleteCondition().get());
+            Optional<If> deleteIf = createDeleteOnIfCall(olapTable, 
olapColumn, dataDesc);
+            if (deleteIf.isPresent()) {
+                sinkCols.add(olapColumn);
+                selectLists.add(new UnboundAlias(deleteIf.get(), olapColumn));
+            }
+            sinkCols.add(olapColumn);
+        }
+    }
+
+    /**
+     * use to get unquoted column name
+     * @return unquoted slot name
+     */
+    public static String getUnquotedName(NamedExpression slot) {
+        if (slot instanceof UnboundAlias) {
+            return slot.getName();
+        } else if (slot instanceof UnboundSlot) {
+            List<String> slotNameParts = ((UnboundSlot) slot).getNameParts();
+            return slotNameParts.get(slotNameParts.size() - 1);
+        }
+        return slot.getName();
+    }
+
+    private static void fillSinkBySourceCols(Map<String, Expression> 
mappingExpressions,
+                                             String olapColumn, UnboundSlot 
tvfColumn,
+                                             List<String> sinkCols, 
List<NamedExpression> selectLists) {
+        sinkCols.add(olapColumn);
+        if (mappingExpressions.containsKey(olapColumn)) {
+            selectLists.add(new 
UnboundAlias(mappingExpressions.get(olapColumn), olapColumn));
+        } else {
+            selectLists.add(new UnboundAlias(tvfColumn, olapColumn));
+        }
+    }
+
+    private static boolean isCsvType(Map<String, String> tvfProperties) {
+        return 
tvfProperties.get(ExternalFileTableValuedFunction.FORMAT).equalsIgnoreCase("csv");
+    }
+
+    /**
+     * fill all column that need to be loaded to sinkCols.
+     * fill the map with sink columns and generated source columns.
+     * sink columns use for 'INSERT INTO'
+     * generated source columns use for 'SELECT'
+     *
+     * @param dataDesc       dataDesc
+     * @param tvfProperties  generated tvfProperties
+     * @param tvfLogicalPlan source tvf relation
+     */
+    private static LogicalPlan buildTvfQueryPlan(BulkLoadDataDesc dataDesc,
+                                                 Map<String, String> 
tvfProperties,
+                                                 LogicalPlan tvfLogicalPlan) 
throws AnalysisException {
+        // build tvf column filter
+        if (dataDesc.getPrecedingFilterExpr().isPresent()) {
+            Set<Expression> preConjuncts =
+                    
ExpressionUtils.extractConjunctionToSet(dataDesc.getPrecedingFilterExpr().get());
+            if (!preConjuncts.isEmpty()) {
+                tvfLogicalPlan = new LogicalFilter<>(preConjuncts, 
tvfLogicalPlan);
+            }
+        }
+
+        Map<String, String> sourceProperties = dataDesc.getProperties();
+        if (dataDesc.getFileFieldNames().isEmpty() && 
isCsvType(tvfProperties)) {
+            String csvSchemaStr = 
sourceProperties.get(ExternalFileTableValuedFunction.CSV_SCHEMA);
+            if (csvSchemaStr != null) {
+                tvfProperties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, 
csvSchemaStr);
+                List<Column> csvSchema = new ArrayList<>();
+                ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, 
sourceProperties);
+                List<NamedExpression> csvColumns = new ArrayList<>();
+                for (Column csvColumn : csvSchema) {
+                    csvColumns.add(new UnboundSlot(csvColumn.getName()));
+                }
+                if (!csvColumns.isEmpty()) {
+                    for (String columnFromPath : 
dataDesc.getColumnsFromPath()) {
+                        csvColumns.add(new UnboundSlot(columnFromPath));
+                    }
+                    return new LogicalProject<>(csvColumns, tvfLogicalPlan);
+                }
+                if (!dataDesc.getPrecedingFilterExpr().isPresent()) {
+                    throw new AnalysisException("Required property 
'csv_schema' for csv file, "
+                            + "when no column list specified and use PRECEDING 
FILTER");
+                }
+            }
+            return getStarProjectPlan(tvfLogicalPlan);
+        }
+        List<NamedExpression> dataDescColumns = new ArrayList<>();
+        for (int i = 0; i < dataDesc.getFileFieldNames().size(); i++) {
+            String sourceColumn = dataDesc.getFileFieldNames().get(i);
+            dataDescColumns.add(new UnboundSlot(sourceColumn));
+        }
+        if (dataDescColumns.isEmpty()) {
+            return getStarProjectPlan(tvfLogicalPlan);
+        } else {
+            return new LogicalProject<>(dataDescColumns, tvfLogicalPlan);
+        }
+    }
+
+    private static LogicalProject<LogicalPlan> getStarProjectPlan(LogicalPlan 
logicalPlan) {
+        return new LogicalProject<>(ImmutableList.of(new UnboundStar(new 
ArrayList<>())), logicalPlan);
+    }
+
+    private static Optional<If> createDeleteOnIfCall(OlapTable olapTable, 
String olapColName,
+                                                     BulkLoadDataDesc 
dataDesc) throws AnalysisException {
+        if (olapTable.hasDeleteSign()
+                && dataDesc.getDeleteCondition().isPresent()) {
+            if (!(dataDesc.getDeleteCondition().get() instanceof 
ComparisonPredicate)) {
+                throw new AnalysisException("DELETE ON clause must be 
comparison expression.");
+            }
+            ComparisonPredicate deleteOn = (ComparisonPredicate) 
dataDesc.getDeleteCondition().get();
+            Expression deleteOnCol = deleteOn.left();
+            if (!(deleteOnCol instanceof UnboundSlot)) {
+                throw new AnalysisException("DELETE ON column must be an 
undecorated OLAP column.");
+            }
+            if (!olapColName.equalsIgnoreCase(getUnquotedName((UnboundSlot) 
deleteOnCol))) {
+                return Optional.empty();
+            }
+            If deleteIf = new If(deleteOn, new TinyIntLiteral((byte) 1), new 
TinyIntLiteral((byte) 0));
+            return Optional.of(deleteIf);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private static void checkDeleteOnConditions(LoadTask.MergeType mergeType, 
Expression deleteCondition)
+                throws AnalysisException {
+        if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) {
+            throw new 
AnalysisException(BulkLoadDataDesc.EXPECT_MERGE_DELETE_ON);
+        }
+        if (mergeType == LoadTask.MergeType.MERGE && deleteCondition == null) {
+            throw new AnalysisException(BulkLoadDataDesc.EXPECT_DELETE_ON);
+        }
+    }
+
+    private static void checkAndAddSequenceCol(OlapTable olapTable, 
BulkLoadDataDesc dataDesc,
+                                               List<String> sinkCols, 
List<NamedExpression> selectLists)
+                throws AnalysisException {
+        Optional<String> optSequenceCol = dataDesc.getSequenceCol();
+        if (!optSequenceCol.isPresent() && !olapTable.hasSequenceCol()) {
+            return;
+        }
+        // check olapTable schema and sequenceCol
+        if (olapTable.hasSequenceCol() && !optSequenceCol.isPresent()) {
+            throw new AnalysisException("Table " + olapTable.getName()
+                    + " has sequence column, need to specify the sequence 
column");
+        }
+        if (optSequenceCol.isPresent() && !olapTable.hasSequenceCol()) {
+            throw new AnalysisException("There is no sequence column in the 
table " + olapTable.getName());
+        }
+        String sequenceCol = dataDesc.getSequenceCol().get();
+        // check source sequence column is in parsedColumnExprList or Table 
base schema
+        boolean hasSourceSequenceCol = false;
+        if (!sinkCols.isEmpty()) {
+            List<String> allCols = new 
ArrayList<>(dataDesc.getFileFieldNames());
+            allCols.addAll(sinkCols);
+            for (String sinkCol : allCols) {
+                if (sinkCol.equals(sequenceCol)) {
+                    hasSourceSequenceCol = true;
+                    break;
+                }
+            }
+        }
+        List<Column> columns = olapTable.getBaseSchema();
+        for (Column column : columns) {
+            if (column.getName().equals(sequenceCol)) {
+                hasSourceSequenceCol = true;
+                break;
+            }
+        }
+        if (!hasSourceSequenceCol) {
+            throw new AnalysisException("There is no sequence column " + 
sequenceCol + " in the " + olapTable.getName()
+                    + " or the COLUMNS and SET clause");
+        } else {
+            sinkCols.add(Column.SEQUENCE_COL);
+            selectLists.add(new UnboundAlias(new UnboundSlot(sequenceCol), 
Column.SEQUENCE_COL));
+        }
+    }
+
+    private UnboundTVFRelation getUnboundTVFRelation(Map<String, String> 
properties) {
+        UnboundTVFRelation relation;
+        if (bulkStorageDesc.getStorageType() == 
BulkStorageDesc.StorageType.S3) {
+            relation = new 
UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
+                    S3TableValuedFunction.NAME, new Properties(properties));
+        } else if (bulkStorageDesc.getStorageType() == 
BulkStorageDesc.StorageType.HDFS) {
+            relation = new 
UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(),
+                    HdfsTableValuedFunction.NAME, new Properties(properties));
+        } else {
+            throw new UnsupportedOperationException("Unsupported load storage 
type: "
+                    + bulkStorageDesc.getStorageType());
+        }
+        return relation;
+    }
+
+    private static OlapTable getOlapTable(ConnectContext ctx, BulkLoadDataDesc 
dataDesc) throws AnalysisException {
+        OlapTable targetTable;
+        TableIf table = RelationUtil.getTable(dataDesc.getNameParts(), 
ctx.getEnv());
+        if (!(table instanceof OlapTable)) {
+            throw new AnalysisException("table must be olapTable in load 
command");
+        }
+        targetTable = ((OlapTable) table);
+        return targetTable;
+    }
+
+    private static Map<String, String> getTvfProperties(BulkLoadDataDesc 
dataDesc, BulkStorageDesc bulkStorageDesc) {
+        Map<String, String> tvfProperties = new 
HashMap<>(bulkStorageDesc.getProperties());
+        String fileFormat = 
dataDesc.getFormatDesc().getFileFormat().orElse("csv");
+        if ("csv".equalsIgnoreCase(fileFormat)) {
+            dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep ->
+                    
tvfProperties.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, 
sep.getSeparator()));
+            dataDesc.getFormatDesc().getLineDelimiter().ifPresent(sep ->
+                    
tvfProperties.put(ExternalFileTableValuedFunction.LINE_DELIMITER, 
sep.getSeparator()));
+        }
+        // TODO: resolve and put ExternalFileTableValuedFunction params
+        tvfProperties.put(ExternalFileTableValuedFunction.FORMAT, fileFormat);
+
+        List<String> filePaths = dataDesc.getFilePaths();
+        // TODO: support multi location by union
+        String listFilePath = filePaths.get(0);
+        if (bulkStorageDesc.getStorageType() == 
BulkStorageDesc.StorageType.S3) {
+            S3Properties.convertToStdProperties(tvfProperties);
+            
tvfProperties.keySet().removeIf(S3Properties.Env.FS_KEYS::contains);
+            // TODO: check file path by s3 fs list status
+            tvfProperties.put(S3TableValuedFunction.S3_URI, listFilePath);
+        }
+
+        final Map<String, String> dataDescProps = dataDesc.getProperties();
+        if (dataDescProps != null) {
+            tvfProperties.putAll(dataDescProps);
+        }
+        List<String> columnsFromPath = dataDesc.getColumnsFromPath();
+        if (columnsFromPath != null && !columnsFromPath.isEmpty()) {
+            
tvfProperties.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS,
+                    String.join(",", columnsFromPath));
+        }
+        return tvfProperties;
+    }
+
+    private void executeInsertStmtPlan(ConnectContext ctx, StmtExecutor 
executor, List<InsertIntoTableCommand> plans) {
+        try {
+            for (LogicalPlan logicalPlan : plans) {
+                ((Command) logicalPlan).run(ctx, executor);
+            }
+        } catch (QueryStateException e) {
+            ctx.setState(e.getQueryState());
+            throw new NereidsException("Command process failed", new 
AnalysisException(e.getMessage(), e));
+        } catch (UserException e) {
+            // Return message to info client what happened.
+            ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+            throw new NereidsException("Command process failed", new 
AnalysisException(e.getMessage(), e));
+        } catch (Exception e) {
+            // Maybe our bug
+            ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
e.getMessage());
+            throw new NereidsException("Command process failed.", new 
AnalysisException(e.getMessage(), e));
+        }
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLoadCommand(this, context);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java
new file mode 100644
index 0000000000..e0365ca098
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.analysis.Separator;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * used to describe data info which is needed to import.
+ * The transform of columns should be added after the keyword named COLUMNS.
+ * The transform after the keyword named SET is the old ways which only 
supports the hadoop function.
+ * It old way of transform will be removed gradually.
+ *       data_desc:
+ *           DATA INFILE ('file_path', ...)
+ *           [NEGATIVE]
+ *           INTO TABLE tbl_name
+ *           [PARTITION (p1, p2)]
+ *           [COLUMNS TERMINATED BY separator]
+ *           [FORMAT AS format]
+ *           [(tmp_col1, tmp_col2, col3, ...)]
+ *           [COLUMNS FROM PATH AS (col1, ...)]
+ *           [SET (k1=f1(xx), k2=f2(xxx))]
+ *           [where_clause]
+ *           DATA FROM TABLE external_hive_tbl_name
+ *           [NEGATIVE]
+ *           INTO TABLE tbl_name
+ *           [PARTITION (p1, p2)]
+ *           [SET (k1=f1(xx), k2=f2(xxx))]
+ *           [where_clause]
+ */
+public class BulkLoadDataDesc {
+
+    public static final String EXPECT_MERGE_DELETE_ON = "not support DELETE ON 
clause when merge type is not MERGE.";
+    public static final String EXPECT_DELETE_ON = "Excepted DELETE ON clause 
when merge type is MERGE.";
+    private static final Logger LOG = 
LogManager.getLogger(BulkLoadDataDesc.class);
+
+    private final List<String> nameParts;
+    private final String tableName;
+    private final String dbName;
+    private final List<String> partitionNames;
+    private final List<String> filePaths;
+    private final boolean isNegative;
+    // column names in the path
+    private final List<String> columnsFromPath;
+    // save column mapping in SET(xxx = xxx) clause
+    private final Map<String, Expression> columnMappings;
+    private final Optional<Expression> precedingFilterExpr;
+    private final Optional<Expression> whereExpr;
+    private final LoadTask.MergeType mergeType;
+    private final String srcTableName;
+    // column names of source files
+    private final List<String> fileFieldNames;
+    private final Optional<String> sequenceCol;
+    private final FileFormatDesc formatDesc;
+    // Merged from fileFieldNames, columnsFromPath and columnMappingList
+    // ImportColumnDesc: column name to (expr or null)
+    private final Optional<Expression> deleteCondition;
+    private final Map<String, String> dataProperties;
+    private boolean isMysqlLoad = false;
+
+    /**
+     * bulk load desc
+     */
+    public BulkLoadDataDesc(List<String> fullTableName,
+                            List<String> partitionNames,
+                            List<String> filePaths,
+                            List<String> columns,
+                            List<String> columnsFromPath,
+                            Map<String, Expression> columnMappings,
+                            FileFormatDesc formatDesc,
+                            boolean isNegative,
+                            Optional<Expression> fileFilterExpr,
+                            Optional<Expression> whereExpr,
+                            LoadTask.MergeType mergeType,
+                            Optional<Expression> deleteCondition,
+                            Optional<String> sequenceColName,
+                            Map<String, String> dataProperties) {
+        this.nameParts = Objects.requireNonNull(fullTableName, "nameParts 
should not null");
+        this.dbName = Objects.requireNonNull(fullTableName.get(1), "dbName 
should not null");
+        this.tableName = Objects.requireNonNull(fullTableName.get(2), 
"tableName should not null");
+        this.partitionNames = Objects.requireNonNull(partitionNames, 
"partitionNames should not null");
+        this.filePaths = Objects.requireNonNull(filePaths, "filePaths should 
not null");
+        this.formatDesc = Objects.requireNonNull(formatDesc, "formatDesc 
should not null");
+        this.fileFieldNames = 
columnsNameToLowerCase(Objects.requireNonNull(columns, "columns should not 
null"));
+        this.columnsFromPath = columnsNameToLowerCase(columnsFromPath);
+        this.isNegative = isNegative;
+        this.columnMappings = columnMappings;
+        this.precedingFilterExpr = fileFilterExpr;
+        this.whereExpr = whereExpr;
+        this.mergeType = mergeType;
+        // maybe from tvf or table
+        this.srcTableName = null;
+        this.deleteCondition = deleteCondition;
+        this.sequenceCol = sequenceColName;
+        this.dataProperties = dataProperties;
+    }
+
+    /**
+     * bulk load file format desc
+     */
+    public static class FileFormatDesc {
+        private final Separator lineDelimiter;
+        private final Separator columnSeparator;
+        private final String fileFormat;
+
+        public FileFormatDesc(Optional<String> fileFormat) {
+            this(Optional.empty(), Optional.empty(), fileFormat);
+        }
+
+        public FileFormatDesc(Optional<String> lineDelimiter, Optional<String> 
columnSeparator) {
+            this(lineDelimiter, columnSeparator, Optional.empty());
+        }
+
+        /**
+         * build bulk load format desc and check valid
+         * @param lineDelimiter text format line delimiter
+         * @param columnSeparator text format column separator
+         * @param fileFormat file format
+         */
+        public FileFormatDesc(Optional<String> lineDelimiter, Optional<String> 
columnSeparator,
+                              Optional<String> fileFormat) {
+            this.lineDelimiter = new Separator(lineDelimiter.orElse(null));
+            this.columnSeparator = new Separator(columnSeparator.orElse(null));
+            try {
+                if 
(!StringUtils.isEmpty(this.lineDelimiter.getOriSeparator())) {
+                    this.lineDelimiter.analyze();
+                }
+                if 
(!StringUtils.isEmpty(this.columnSeparator.getOriSeparator())) {
+                    this.columnSeparator.analyze();
+                }
+            } catch (AnalysisException e) {
+                throw new RuntimeException("Fail to parse separator. ", e);
+            }
+            this.fileFormat = fileFormat.orElse(null);
+        }
+
+        public Optional<Separator> getLineDelimiter() {
+            if (lineDelimiter == null || lineDelimiter.getOriSeparator() == 
null) {
+                return Optional.empty();
+            }
+            return Optional.of(lineDelimiter);
+        }
+
+        public Optional<Separator> getColumnSeparator() {
+            if (columnSeparator == null || columnSeparator.getOriSeparator() 
== null) {
+                return Optional.empty();
+            }
+            return Optional.of(columnSeparator);
+        }
+
+        public Optional<String> getFileFormat() {
+            return Optional.ofNullable(fileFormat);
+        }
+    }
+
+    public List<String> getNameParts() {
+        return nameParts;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public List<String> getPartitionNames() {
+        return partitionNames;
+    }
+
+    public FileFormatDesc getFormatDesc() {
+        return formatDesc;
+    }
+
+    public List<String> getFilePaths() {
+        return filePaths;
+    }
+
+    public List<String> getColumnsFromPath() {
+        return columnsFromPath;
+    }
+
+    public Map<String, Expression> getColumnMappings() {
+        return columnMappings;
+    }
+
+    public Optional<Expression> getPrecedingFilterExpr() {
+        return precedingFilterExpr;
+    }
+
+    public Optional<Expression> getWhereExpr() {
+        return whereExpr;
+    }
+
+    public List<String> getFileFieldNames() {
+        return fileFieldNames;
+    }
+
+    public Optional<String> getSequenceCol() {
+        if (sequenceCol.isPresent() && StringUtils.isBlank(sequenceCol.get())) 
{
+            return Optional.empty();
+        }
+        return sequenceCol;
+    }
+
+    public Optional<Expression> getDeleteCondition() {
+        return deleteCondition;
+    }
+
+    public LoadTask.MergeType getMergeType() {
+        return mergeType;
+    }
+
+    public Map<String, String> getProperties() {
+        return dataProperties;
+    }
+
+    // Change all the columns name to lower case, because Doris column is 
case-insensitive.
+    private List<String> columnsNameToLowerCase(List<String> columns) {
+        if (columns == null || columns.isEmpty() || 
"json".equals(this.formatDesc.fileFormat)) {
+            return columns;
+        }
+        List<String> lowerCaseColumns = new ArrayList<>();
+        for (int i = 0; i < columns.size(); i++) {
+            String column = columns.get(i);
+            lowerCaseColumns.add(i, column.toLowerCase());
+        }
+        return lowerCaseColumns;
+    }
+
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
+    /**
+     * print data desc load info
+     * @return bulk load sql
+     */
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        if (isMysqlLoad) {
+            sb.append("DATA ").append(isClientLocal() ? "LOCAL " : "");
+            sb.append("INFILE '").append(filePaths.get(0)).append("'");
+        } else if (isLoadFromTable()) {
+            sb.append(mergeType.toString());
+            sb.append(" DATA FROM TABLE ").append(srcTableName);
+        } else {
+            sb.append(mergeType.toString());
+            sb.append(" DATA INFILE (");
+            Joiner.on(", ").appendTo(sb, filePaths.stream()
+                    .map(s -> "'" + s + 
"'").collect(Collectors.toList())).append(")");
+        }
+        if (isNegative) {
+            sb.append(" NEGATIVE");
+        }
+        sb.append(" INTO TABLE ");
+        sb.append(isMysqlLoad ? ClusterNamespace.getNameFromFullName(dbName) + 
"." + tableName : tableName);
+        if (partitionNames != null && !partitionNames.isEmpty()) {
+            sb.append(" (");
+            Joiner.on(", ").appendTo(sb, partitionNames).append(")");
+        }
+        if (formatDesc.columnSeparator != null) {
+            sb.append(" COLUMNS TERMINATED BY 
").append(formatDesc.columnSeparator.toSql());
+        }
+        if (formatDesc.lineDelimiter != null && isMysqlLoad) {
+            sb.append(" LINES TERMINATED BY 
").append(formatDesc.lineDelimiter.toSql());
+        }
+        if (formatDesc.fileFormat != null && !formatDesc.fileFormat.isEmpty()) 
{
+            sb.append(" FORMAT AS '" + formatDesc.fileFormat + "'");
+        }
+        if (fileFieldNames != null && !fileFieldNames.isEmpty()) {
+            sb.append(" (");
+            Joiner.on(", ").appendTo(sb, fileFieldNames).append(")");
+        }
+        if (columnsFromPath != null && !columnsFromPath.isEmpty()) {
+            sb.append(" COLUMNS FROM PATH AS (");
+            Joiner.on(", ").appendTo(sb, columnsFromPath).append(")");
+        }
+        if (columnMappings != null && !columnMappings.isEmpty()) {
+            sb.append(" SET (");
+            Joiner.on(", ").appendTo(sb, columnMappings.entrySet().stream()
+                    .map(e -> e.getKey() + "=" + 
e.getValue().toSql()).collect(Collectors.toList())).append(")");
+        }
+        whereExpr.ifPresent(e -> sb.append(" WHERE ").append(e.toSql()));
+        deleteCondition.ifPresent(e -> {
+            if (mergeType == LoadTask.MergeType.MERGE) {
+                sb.append(" DELETE ON ").append(e.toSql());
+            }
+        });
+        return sb.toString();
+    }
+
+    private boolean isLoadFromTable() {
+        return false;
+    }
+
+    private boolean isClientLocal() {
+        return false;
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java
new file mode 100644
index 0000000000..38543c069d
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.datasource.property.S3ClientBEProperties;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Broker descriptor
+ * Broker example:
+ * WITH S3/HDFS
+ * (
+ *   "username" = "user0",
+ *   "password" = "password0"
+ * )
+ */
+public class BulkStorageDesc implements Writable {
+    @SerializedName(value = "storageType")
+    protected StorageType storageType;
+    @SerializedName(value = "properties")
+    protected Map<String, String> properties;
+    @SerializedName(value = "name")
+    private String name;
+
+    /**
+     * Bulk Storage Type
+     */
+    public enum StorageType {
+        BROKER,
+        S3,
+        HDFS,
+        LOCAL;
+
+    }
+
+    /**
+     * BulkStorageDesc
+     * @param name bulk load name
+     * @param properties properties
+     */
+    public BulkStorageDesc(String name, Map<String, String> properties) {
+        this(name, StorageType.BROKER, properties);
+    }
+
+    /**
+     * BulkStorageDesc
+     * @param name bulk load name
+     * @param type bulk load type
+     * @param properties properties
+     */
+    public BulkStorageDesc(String name, StorageType type, Map<String, String> 
properties) {
+        this.name = name;
+        this.properties = properties;
+        if (this.properties == null) {
+            this.properties = Maps.newHashMap();
+        }
+        this.storageType = type;
+        
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
+    }
+
+    public StorageType getStorageType() {
+        return storageType;
+    }
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static BulkStorageDesc read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, BulkStorageDesc.class);
+    }
+
+    /**
+     * bulk load to sql string
+     * @return bulk load sql
+     */
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        if (storageType == StorageType.BROKER) {
+            sb.append("WITH BROKER ").append(name);
+        } else {
+            sb.append("WITH ").append(storageType.name());
+        }
+        if (properties != null && !properties.isEmpty()) {
+            PrintableMap<String, String> printableMap = new 
PrintableMap<>(properties, " = ", true, false, true);
+            sb.append(" (").append(printableMap).append(")");
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index e91e846e2f..c055da5735 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -24,6 +24,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.DeleteCommand;
 import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
 import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
 import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
 import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
 
 /** CommandVisitor. */
@@ -52,6 +53,10 @@ public interface CommandVisitor<R, C> {
         return visitCommand(deleteCommand, context);
     }
 
+    default R visitLoadCommand(LoadCommand loadCommand, C context) {
+        return visitCommand(loadCommand, context);
+    }
+
     default R visitExportCommand(ExportCommand exportCommand, C context) {
         return visitCommand(exportCommand, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 4eeadffd09..6acc276a25 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -103,7 +103,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     protected static final String FUZZY_PARSE = "fuzzy_parse";
     protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes";
     protected static final String SKIP_LINES = "skip_lines";
-    protected static final String CSV_SCHEMA = "csv_schema";
+    public static final String CSV_SCHEMA = "csv_schema";
     protected static final String COMPRESS_TYPE = "compress_type";
     public static final String PATH_PARTITION_KEYS = "path_partition_keys";
     // decimal(p,s)
@@ -380,12 +380,16 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
 
     @Override
     public List<Column> getTableColumns() throws AnalysisException {
-        if (FeConstants.runningUnitTest) {
-            return Lists.newArrayList();
-        }
         if (!csvSchema.isEmpty()) {
             return csvSchema;
         }
+        if (FeConstants.runningUnitTest) {
+            Object mockedUtObj = FeConstants.unitTestConstant;
+            if (mockedUtObj instanceof List) {
+                return ((List<Column>) mockedUtObj);
+            }
+            return new ArrayList<>();
+        }
         if (this.columns != null) {
             return columns;
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java
new file mode 100644
index 0000000000..e5a3e66932
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java
@@ -0,0 +1,658 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.analyzer.UnboundAlias;
+import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.stats.ExpressionEstimation;
+import org.apache.doris.nereids.trees.expressions.Add;
+import org.apache.doris.nereids.trees.expressions.BinaryArithmetic;
+import org.apache.doris.nereids.trees.expressions.CaseWhen;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.WhenClause;
+import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
+import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.Statistics;
+import org.apache.doris.utframe.TestWithFeService;
+
+import mockit.Mock;
+import mockit.MockUp;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class BulkLoadDataDescTest extends TestWithFeService {
+
+    private List<String> sinkCols1 = new ArrayList<>();
+    private List<String> sinkCols2 = new ArrayList<>();
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        connectContext.getState().setNereids(true);
+        connectContext.getSessionVariable().enableFallbackToOriginalPlanner = 
false;
+        connectContext.getSessionVariable().enableNereidsTimeout = false;
+        connectContext.getSessionVariable().enableNereidsDML = true;
+        FeConstants.runningUnitTest = true;
+
+        createDatabase("nereids_load");
+        useDatabase("nereids_load");
+        String createTableSql = "CREATE TABLE `customer` (\n"
+                + "  `custkey` int(11) NOT NULL,\n"
+                + "  `c_name` varchar(25) NOT NULL,\n"
+                + "  `c_address` varchar(40) NOT NULL,\n"
+                + "  `c_nationkey` int(11) NOT NULL,\n"
+                + "  `c_phone` varchar(15) NOT NULL,\n"
+                + "  `c_acctbal` DECIMAL(15, 2) NOT NULL,\n"
+                + "  `c_mktsegment` varchar(10) NOT NULL,\n"
+                + "  `c_comment` varchar(117) NOT NULL\n"
+                + ") ENGINE=OLAP\n"
+                + "UNIQUE KEY(`custkey`)\n"
+                + "COMMENT 'OLAP'\n"
+                + "DISTRIBUTED BY HASH(`custkey`) BUCKETS 24\n"
+                + "PROPERTIES (\n"
+                + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+                + "\"function_column.sequence_col\" = \"c_nationkey\","
+                + "\"storage_format\" = \"V2\"\n"
+                + ");";
+        createTable(createTableSql);
+        sinkCols1.add("custkey");
+        sinkCols1.add("c_name");
+        sinkCols1.add("c_address");
+        sinkCols1.add("c_nationkey");
+        sinkCols1.add("c_phone");
+        sinkCols1.add("c_acctbal");
+        sinkCols1.add("c_mktsegment");
+        sinkCols1.add("c_comment");
+
+        String createTableSql2 = "CREATE TABLE `customer_dup` (\n"
+                + "  `custkey` int(11) NOT NULL,\n"
+                + "  `c_name` varchar(25) NOT NULL,\n"
+                + "  `address` varchar(40) NOT NULL,\n"
+                + "  `c_nationkey` int(11) NOT NULL,\n"
+                + "  `c_phone` varchar(15) NOT NULL,\n"
+                + "  `c_acctbal` DECIMAL(15, 2) NOT NULL,\n"
+                + "  `c_mktsegment` varchar(10) NOT NULL,\n"
+                + "  `c_comment` varchar(117) NOT NULL\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(`custkey`,`c_name`)\n"
+                + "COMMENT 'OLAP'\n"
+                + "DISTRIBUTED BY HASH(`custkey`) BUCKETS 24\n"
+                + "PROPERTIES (\n"
+                + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+                + "\"storage_format\" = \"V2\"\n"
+                + ");";
+        createTable(createTableSql2);
+        sinkCols2.add("custkey");
+        sinkCols2.add("c_name");
+        sinkCols2.add("address");
+        sinkCols2.add("c_nationkey");
+        sinkCols2.add("c_phone");
+        sinkCols2.add("c_acctbal");
+        sinkCols2.add("c_mktsegment");
+        sinkCols2.add("c_comment");
+
+    }
+
+    @Test
+    public void testParseLoadStmt() throws Exception {
+        String loadSql1 = "LOAD LABEL customer_j23( "
+                + "     DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer"
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     LINES TERMINATED BY \"\n\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment) "
+                + "     SET ( custkey=case when c_custkey=-8 then -3 when 
c_custkey=-1 then 11 else c_custkey end )   "
+                + "     PRECEDING FILTER c_nationkey=\"CHINA\"     "
+                + "     WHERE custkey > 100"
+                + "     ORDER BY c_custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+
+        List<Pair<LogicalPlan, StatementContext>> statements = new 
NereidsParser().parseMultiple(loadSql1);
+        Assertions.assertFalse(statements.isEmpty());
+
+        List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+        expectedSinkColumns.add(Column.SEQUENCE_COL);
+
+        CaseWhen caseWhen = new CaseWhen(new ArrayList<WhenClause>() {
+            {
+                add(new WhenClause(
+                        new EqualTo(
+                                new UnboundSlot("c_custkey"),
+                                new TinyIntLiteral((byte) -8)),
+                        new TinyIntLiteral((byte) -3)));
+                add(new WhenClause(
+                        new EqualTo(
+                                new UnboundSlot("c_custkey"),
+                                new TinyIntLiteral((byte) -1)),
+                        new TinyIntLiteral((byte) 11)));
+            }
+        }, new UnboundSlot("c_custkey"));
+
+        List<NamedExpression> expectedProjects = new 
ArrayList<NamedExpression>() {
+            {
+                add(new UnboundAlias(caseWhen, "custkey"));
+                add(new UnboundAlias(new UnboundSlot("c_address"), 
"c_address"));
+                add(new UnboundAlias(new UnboundSlot("c_nationkey"), 
"c_nationkey"));
+                add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+                add(new UnboundAlias(new UnboundSlot("c_acctbal"), 
"c_acctbal"));
+                add(new UnboundAlias(new UnboundSlot("c_mktsegment"), 
"c_mktsegment"));
+                add(new UnboundAlias(new UnboundSlot("c_comment"), 
"c_comment"));
+            }
+        };
+        List<Expression> expectedConjuncts = new ArrayList<Expression>() {
+            {
+                add(new GreaterThan(caseWhen, new IntegerLiteral(100)));
+            }
+        };
+        assertInsertIntoPlan(statements, expectedSinkColumns, 
expectedProjects, expectedConjuncts, true);
+    }
+
+    private void assertInsertIntoPlan(List<Pair<LogicalPlan, 
StatementContext>> statements,
+                                      List<String> expectedSinkColumns,
+                                      List<NamedExpression> expectedProjects,
+                                      List<Expression> expectedConjuncts,
+                                      boolean expectedPreFilter) throws 
AnalysisException {
+        Assertions.assertTrue(statements.get(0).first instanceof LoadCommand);
+        List<LogicalPlan> plans = ((LoadCommand) 
statements.get(0).first).parseToInsertIntoPlan(connectContext);
+        Assertions.assertTrue(plans.get(0) instanceof UnboundOlapTableSink);
+        List<String> colNames = ((UnboundOlapTableSink<?>) 
plans.get(0)).getColNames();
+        Assertions.assertEquals(colNames.size(), expectedSinkColumns.size());
+        for (String sinkCol : expectedSinkColumns) {
+            Assertions.assertTrue(colNames.contains(sinkCol));
+        }
+        Assertions.assertTrue(plans.get(0).child(0) instanceof LogicalProject);
+        LogicalProject<?> project = ((LogicalProject<?>) 
plans.get(0).child(0));
+        Set<String> projects = project.getProjects().stream()
+                .map(Object::toString)
+                .collect(Collectors.toSet());
+        for (NamedExpression namedExpression : expectedProjects) {
+            
Assertions.assertTrue(projects.contains(namedExpression.toString()));
+        }
+        Assertions.assertTrue(project.child(0) instanceof LogicalFilter);
+        LogicalFilter<?> filter = ((LogicalFilter<?>) project.child(0));
+        Set<String> filterConjuncts = filter.getConjuncts().stream()
+                .map(Object::toString)
+                .collect(Collectors.toSet());
+        for (Expression expectedConjunct : expectedConjuncts) {
+            
Assertions.assertTrue(filterConjuncts.contains(expectedConjunct.toString()));
+        }
+
+        Assertions.assertTrue(filter.child(0) instanceof LogicalProject);
+        LogicalProject<?> tvfProject = (LogicalProject<?>) filter.child(0);
+        if (expectedPreFilter) {
+            Assertions.assertTrue(tvfProject.child(0) instanceof 
LogicalFilter);
+            LogicalFilter<?> tvfFilter = (LogicalFilter<?>) 
tvfProject.child(0);
+            Assertions.assertTrue(tvfFilter.child(0) instanceof 
LogicalCheckPolicy);
+            Assertions.assertTrue(tvfFilter.child(0).child(0) instanceof 
UnboundTVFRelation);
+        } else {
+            Assertions.assertTrue(tvfProject.child(0) instanceof 
LogicalCheckPolicy);
+            Assertions.assertTrue(tvfProject.child(0).child(0) instanceof 
UnboundTVFRelation);
+        }
+    }
+
+    @Test
+    public void testParseLoadStmtPartitions() throws Exception {
+        String loadSql1 = "LOAD LABEL customer_j23( "
+                + "     DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer"
+                + "     PARTITION (c_name, dt) "
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     LINES TERMINATED BY \"\n\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment, dt) "
+                + "     COLUMNS FROM PATH AS (dt)"
+                + "     ORDER BY c_custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+        List<Pair<LogicalPlan, StatementContext>> statements = new 
NereidsParser().parseMultiple(loadSql1);
+        Assertions.assertFalse(statements.isEmpty());
+
+        List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+        expectedSinkColumns.add(Column.SEQUENCE_COL);
+        expectedSinkColumns.add("dt");
+        List<NamedExpression> expectedProjects = new 
ArrayList<NamedExpression>() {
+            {
+                add(new UnboundAlias(new UnboundSlot("c_custkey"), "custkey"));
+                add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+                add(new UnboundAlias(new UnboundSlot("c_address"), 
"c_address"));
+                add(new UnboundAlias(new UnboundSlot("c_nationkey"), 
"c_nationkey"));
+                add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+                add(new UnboundAlias(new UnboundSlot("c_acctbal"), 
"c_acctbal"));
+                add(new UnboundAlias(new UnboundSlot("c_mktsegment"), 
"c_mktsegment"));
+                add(new UnboundAlias(new UnboundSlot("c_comment"), 
"c_comment"));
+                add(new UnboundSlot("dt"));
+            }
+        };
+        List<Expression> expectedConjuncts = new ArrayList<>();
+        assertInsertIntoPlan(statements, expectedSinkColumns, 
expectedProjects, expectedConjuncts, false);
+    }
+
+    @Test
+    public void testParseLoadStmtColumFromPath() throws Exception {
+        String loadSql1 = "LOAD LABEL customer_j23( "
+                + "     DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer"
+                + "     PARTITION (c_name, dt) "
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     LINES TERMINATED BY \"\n\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment, dt) "
+                + "     COLUMNS FROM PATH AS (pt)   "
+                + "     SET ( custkey=c_custkey+1 )   "
+                + "     PRECEDING FILTER c_nationkey=\"CHINA\"     "
+                + "     WHERE custkey > 100"
+                + "     ORDER BY c_custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+        List<Pair<LogicalPlan, StatementContext>> statements = new 
NereidsParser().parseMultiple(loadSql1);
+        Assertions.assertFalse(statements.isEmpty());
+
+        List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+        expectedSinkColumns.add(Column.SEQUENCE_COL);
+        expectedSinkColumns.add("pt");
+        List<NamedExpression> expectedProjects = new 
ArrayList<NamedExpression>() {
+            {
+                add(new UnboundAlias(new Add(
+                        new UnboundSlot("c_custkey"), new 
TinyIntLiteral((byte) 1)), "custkey"));
+                add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+                add(new UnboundAlias(new UnboundSlot("c_address"), 
"c_address"));
+                add(new UnboundAlias(new UnboundSlot("c_nationkey"), 
"c_nationkey"));
+                add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+                add(new UnboundAlias(new UnboundSlot("c_acctbal"), 
"c_acctbal"));
+                add(new UnboundAlias(new UnboundSlot("c_mktsegment"), 
"c_mktsegment"));
+                add(new UnboundAlias(new UnboundSlot("c_comment"), 
"c_comment"));
+                add(new UnboundSlot("pt"));
+            }
+        };
+        List<Expression> expectedConjuncts = new ArrayList<Expression>() {
+            {
+                add(new GreaterThan(new Add(new UnboundSlot("c_custkey"), new 
TinyIntLiteral((byte) 1)),
+                        new IntegerLiteral(100)));
+            }
+        };
+        assertInsertIntoPlan(statements, expectedSinkColumns, 
expectedProjects, expectedConjuncts, true);
+    }
+
+    @Test
+    public void testParseLoadStmtNoColumn() throws Exception {
+        String loadSql1 = "LOAD LABEL customer_no_col( "
+                + "     DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer"
+                + "     FORMAT AS CSV"
+                + "     ORDER BY custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+
+        List<Pair<LogicalPlan, StatementContext>> statements = new 
NereidsParser().parseMultiple(loadSql1);
+        Assertions.assertFalse(statements.isEmpty());
+        List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+        expectedSinkColumns.add(Column.SEQUENCE_COL);
+        List<NamedExpression> expectedProjects = new 
ArrayList<NamedExpression>() {
+            {
+                // when no specified columns, tvf columns equals to olap 
columns
+                add(new UnboundAlias(new UnboundSlot("custkey"), "custkey"));
+                add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+                add(new UnboundAlias(new UnboundSlot("c_address"), 
"c_address"));
+                add(new UnboundAlias(new UnboundSlot("c_nationkey"), 
"c_nationkey"));
+                add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+                add(new UnboundAlias(new UnboundSlot("c_acctbal"), 
"c_acctbal"));
+                add(new UnboundAlias(new UnboundSlot("c_mktsegment"), 
"c_mktsegment"));
+                add(new UnboundAlias(new UnboundSlot("c_comment"), 
"c_comment"));
+            }
+        };
+        List<Expression> expectedConjuncts = new ArrayList<>();
+        assertInsertIntoPlan(statements, expectedSinkColumns, 
expectedProjects, expectedConjuncts, false);
+
+        // k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)
+        String loadSql2 = "LOAD LABEL customer_no_col2( "
+                + "     DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer"
+                + "     FORMAT AS CSV"
+                + "     ORDER BY custkey "
+                + "     PROPERTIES( "
+                + "         \"csv_schema\" = \""
+                + "             custkey:INT;"
+                + "             c_name:STRING;"
+                + "             c_address:STRING;"
+                + "             c_nationkey:INT;"
+                + "             c_phone:STRING;"
+                + "             c_acctbal:DECIMAL(15, 2);"
+                + "             c_mktsegment:STRING;"
+                + "             c_comment:STRING;\""
+                + "     ) "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+
+        List<Pair<LogicalPlan, StatementContext>> statements2 = new 
NereidsParser().parseMultiple(loadSql2);
+        Assertions.assertFalse(statements2.isEmpty());
+        List<String> expectedSinkColumns2 = new ArrayList<>(sinkCols1);
+        expectedSinkColumns2.add(Column.SEQUENCE_COL);
+        List<NamedExpression> expectedProjects2 = new 
ArrayList<NamedExpression>() {
+            {
+                add(new UnboundAlias(new UnboundSlot("custkey"), "custkey"));
+                add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+                add(new UnboundAlias(new UnboundSlot("c_address"), 
"c_address"));
+                add(new UnboundAlias(new UnboundSlot("c_nationkey"), 
"c_nationkey"));
+                add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+                add(new UnboundAlias(new UnboundSlot("c_acctbal"), 
"c_acctbal"));
+                add(new UnboundAlias(new UnboundSlot("c_mktsegment"), 
"c_mktsegment"));
+                add(new UnboundAlias(new UnboundSlot("c_comment"), 
"c_comment"));
+            }
+        };
+        List<Expression> expectedConjuncts2 = new ArrayList<>();
+        assertInsertIntoPlan(statements2, expectedSinkColumns2, 
expectedProjects2, expectedConjuncts2, false);
+    }
+
+    @Test
+    public void testParseLoadStmtWithParquetMappingFilter() throws Exception {
+        String loadSql1 = "LOAD LABEL customer_dup_mapping( "
+                + "     DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer_dup"
+                + "     FORMAT AS PARQUET"
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment) "
+                + "     SET ( custkey=c_custkey+1, address=c_address+'_base')  
 "
+                + "     PRECEDING FILTER c_nationkey=\"CHINA\"     "
+                + "     WHERE custkey = 100"
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+        List<Pair<LogicalPlan, StatementContext>> statements = new 
NereidsParser().parseMultiple(loadSql1);
+        Assertions.assertFalse(statements.isEmpty());
+        List<String> expectedSinkColumns = new ArrayList<>(sinkCols2);
+        List<NamedExpression> expectedProjects = new 
ArrayList<NamedExpression>() {
+            {
+                add(new UnboundAlias(new Add(
+                        new UnboundSlot("c_custkey"), new 
TinyIntLiteral((byte) 1)), "custkey"));
+                add(new UnboundAlias(new UnboundSlot("c_name"), "c_name"));
+                add(new UnboundAlias(new Add(
+                        new UnboundSlot("c_address"), new 
StringLiteral("_base")), "address"));
+                add(new UnboundAlias(new UnboundSlot("c_nationkey"), 
"c_nationkey"));
+                add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone"));
+                add(new UnboundAlias(new UnboundSlot("c_acctbal"), 
"c_acctbal"));
+                add(new UnboundAlias(new UnboundSlot("c_mktsegment"), 
"c_mktsegment"));
+                add(new UnboundAlias(new UnboundSlot("c_comment"), 
"c_comment"));
+            }
+        };
+        List<Expression> expectedConjuncts = new ArrayList<Expression>() {
+            {
+                add(new EqualTo(new Add(
+                        new UnboundSlot("c_custkey"), new 
TinyIntLiteral((byte) 1)), new IntegerLiteral(100)));
+            }
+        };
+        assertInsertIntoPlan(statements, expectedSinkColumns, 
expectedProjects, expectedConjuncts, true);
+    }
+
+    @Test
+    public void testParseLoadStmtWithDeleteOn() throws Exception {
+        String loadSqlWithDeleteOnErr1 = "LOAD LABEL customer_label1( "
+                + "     APPEND DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer"
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment) "
+                + "     SET ( custkey=c_custkey+1 )   "
+                + "     PRECEDING FILTER c_nationkey=\"CHINA\"     "
+                + "     WHERE custkey > 100"
+                + "     DELETE ON c_custkey < 120     "
+                + "     ORDER BY custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+        try {
+            List<Pair<LogicalPlan, StatementContext>> statements =
+                    new NereidsParser().parseMultiple(loadSqlWithDeleteOnErr1);
+            Assertions.assertFalse(statements.isEmpty());
+            Assertions.assertTrue(statements.get(0).first instanceof 
LoadCommand);
+            ((LoadCommand) 
statements.get(0).first).parseToInsertIntoPlan(connectContext);
+        } catch (AnalysisException e) {
+            
Assertions.assertTrue(e.getMessage().contains(BulkLoadDataDesc.EXPECT_MERGE_DELETE_ON));
+        }
+
+        String loadSqlWithDeleteOnErr2 = "LOAD LABEL customer_label1( "
+                + "     MERGE DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer"
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment) "
+                + "     SET ( custkey=c_custkey+1 )   "
+                + "     PRECEDING FILTER c_nationkey=\"CHINA\"     "
+                + "     WHERE custkey > 100"
+                + "     ORDER BY custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+        try {
+            List<Pair<LogicalPlan, StatementContext>> statements =
+                    new NereidsParser().parseMultiple(loadSqlWithDeleteOnErr2);
+            Assertions.assertFalse(statements.isEmpty());
+            Assertions.assertTrue(statements.get(0).first instanceof 
LoadCommand);
+            ((LoadCommand) 
statements.get(0).first).parseToInsertIntoPlan(connectContext);
+        } catch (AnalysisException e) {
+            
Assertions.assertTrue(e.getMessage().contains(BulkLoadDataDesc.EXPECT_DELETE_ON));
+        }
+
+        String loadSqlWithDeleteOnOk = "LOAD LABEL customer_label2( "
+                + "     MERGE DATA INFILE(\"s3://bucket/customer\") "
+                + "     INTO TABLE customer"
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment) "
+                + "     SET ( custkey=c_custkey+1 )   "
+                + "     PRECEDING FILTER c_nationkey=\"CHINA\"     "
+                + "     WHERE custkey > 100"
+                + "     DELETE ON custkey < 120     "
+                + "     ORDER BY custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+
+        FeConstants.unitTestConstant = new ArrayList<Column>() {
+            {
+                add(new Column("c_custkey", PrimitiveType.INT, true));
+                add(new Column("c_name", PrimitiveType.VARCHAR, true));
+                add(new Column("c_address", PrimitiveType.VARCHAR, true));
+                add(new Column("c_nationkey", PrimitiveType.INT, true));
+                add(new Column("c_phone", PrimitiveType.VARCHAR, true));
+                add(new Column("c_acctbal", PrimitiveType.DECIMALV2, true));
+                add(new Column("c_mktsegment", PrimitiveType.VARCHAR, true));
+                add(new Column("c_comment", PrimitiveType.VARCHAR, true));
+            }
+        };
+        new MockUp<ExpressionEstimation>(ExpressionEstimation.class) {
+            @Mock
+            public ColumnStatistic visitCast(Cast cast, Statistics context) {
+                return ColumnStatistic.UNKNOWN;
+            }
+
+            @Mock
+            public ColumnStatistic visitBinaryArithmetic(BinaryArithmetic 
binaryArithmetic, Statistics context) {
+                return ColumnStatistic.UNKNOWN;
+            }
+        };
+
+        List<Pair<LogicalPlan, StatementContext>> statements = new 
NereidsParser().parseMultiple(loadSqlWithDeleteOnOk);
+        Assertions.assertFalse(statements.isEmpty());
+
+        List<String> expectedSinkColumns = new ArrayList<>(sinkCols1);
+        expectedSinkColumns.add(Column.SEQUENCE_COL);
+        expectedSinkColumns.add(Column.DELETE_SIGN);
+        List<NamedExpression> expectedProjects = new ArrayList<>();
+        List<Expression> expectedConjuncts = new ArrayList<>();
+        assertInsertIntoPlan(statements, expectedSinkColumns, 
expectedProjects, expectedConjuncts, true);
+        // new StmtExecutor(connectContext, loadSqlWithDeleteOnOk).execute();
+    }
+
+    @Test
+    public void testParseLoadStmtPatternPath() throws Exception {
+        String path1 = "part*";
+        String path2 = "*/part_000";
+        String path3 = "*part_000*";
+        String path4 = "*/*part_000*";
+        String loadTemplate = "LOAD LABEL customer_j23( "
+                + "     DATA INFILE(\"s3://bucket/customer/PATTERN\") "
+                + "     INTO TABLE customer"
+                + "     PARTITION (c_name, dt) "
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     LINES TERMINATED BY \"\n\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment, dt) "
+                + "     ORDER BY c_custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+        Assertions.assertFalse(new NereidsParser()
+                .parseMultiple(loadTemplate.replace("PATTERN", 
path1)).isEmpty());
+        Assertions.assertFalse(new NereidsParser()
+                .parseMultiple(loadTemplate.replace("PATTERN", 
path2)).isEmpty());
+        Assertions.assertFalse(new NereidsParser()
+                .parseMultiple(loadTemplate.replace("PATTERN", 
path3)).isEmpty());
+        Assertions.assertFalse(new NereidsParser()
+                .parseMultiple(loadTemplate.replace("PATTERN", 
path4)).isEmpty());
+    }
+
+    @Test
+    public void testParseLoadStmtMultiLocations() throws Exception {
+        String loadMultiLocations = "LOAD LABEL customer_j23( "
+                + "     DATA INFILE("
+                + "         \"s3://bucket/customer/path1\", "
+                + "         \"s3://bucket/customer/path2\", "
+                + "         \"s3://bucket/customer/path3\") "
+                + "     INTO TABLE customer"
+                + "     PARTITION (c_name, dt) "
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     LINES TERMINATED BY \"\n\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment, dt) "
+                + "     ORDER BY c_custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+        Assertions.assertFalse(new NereidsParser()
+                .parseMultiple(loadMultiLocations).isEmpty());
+    }
+
+    @Test
+    public void testParseLoadStmtMultiBulkDesc() throws Exception {
+        String loadMultiLocations = "LOAD LABEL customer_j23( "
+                + "     DATA INFILE("
+                + "         \"s3://bucket/customer/path1\", "
+                + "         \"s3://bucket/customer/path2\", "
+                + "         \"s3://bucket/customer/path3\") "
+                + "     INTO TABLE customer"
+                + "     PARTITION (c_name) "
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment) "
+                + "     ORDER BY c_custkey "
+                + "     ,"
+                + "     DATA INFILE(\"s3://bucket/customer/par_a*\") "
+                + "     INTO TABLE customer_dup"
+                + "     FORMAT AS PARQUET"
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment) "
+                + "     SET ( custkey=c_custkey+1, address=c_address+'_base')  
 "
+                + "     WHERE custkey < 50"
+                + "     ,"
+                + "     DATA INFILE("
+                + "         \"s3://bucket/customer/p\") "
+                + "     INTO TABLE customer"
+                + "     PARTITION (c_name, dt) "
+                + "     COLUMNS TERMINATED BY \"|\""
+                + "     LINES TERMINATED BY \"\n\""
+                + "     (c_custkey, c_name, c_address, c_nationkey, c_phone, 
c_acctbal, c_mktsegment, c_comment, dt)"
+                + "     SET ( custkey=c_custkey+1 )   "
+                + "     PRECEDING FILTER c_nationkey=\"CHINA\"     "
+                + "     WHERE custkey > 100"
+                + "     ORDER BY c_custkey "
+                + "  ) "
+                + "  WITH S3(  "
+                + "     \"s3.access_key\" = \"AK\", "
+                + "     \"s3.secret_key\" = \"SK\", "
+                + "     \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\",   "
+                + "     \"s3.region\" = \"ap-beijing\") "
+                + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT 
\"test\";";
+        Assertions.assertFalse(new NereidsParser()
+                .parseMultiple(loadMultiLocations).isEmpty());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
index 64a0dc5cad..a6eb67b061 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
@@ -32,15 +32,16 @@ import 
org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.datasource.property.constants.S3Properties.Env;
 import org.apache.doris.load.loadv2.LoadTask.MergeType;
 import org.apache.doris.tablefunction.S3TableValuedFunction;
+import org.apache.doris.utframe.TestWithFeService;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import mockit.Expectations;
 import mockit.Injectable;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.io.StringReader;
 import java.util.Collections;
@@ -48,7 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class S3TvfLoadStmtTest {
+public class S3TvfLoadStmtTest extends TestWithFeService {
 
     private static final String ACCESS_KEY_VALUE = "ak";
 
@@ -70,7 +71,7 @@ public class S3TvfLoadStmtTest {
 
     private Set<String> colNames;
 
-    @Before
+    @BeforeAll
     public void setUp() throws AnalysisException {
         FeConstants.runningUnitTest = true;
 
@@ -101,7 +102,7 @@ public class S3TvfLoadStmtTest {
                 Maps.newHashMap(), "comment");
         final SelectStmt selectStmt = (SelectStmt) 
s3TvfLoadStmt.getQueryStmt();
         final Expr whereClause = Deencapsulation.getField(selectStmt, 
"whereClause");
-        Assert.assertEquals(whereClause, new 
CompoundPredicate(CompoundPredicate.Operator.AND, greater, less));
+        Assertions.assertEquals(whereClause, new 
CompoundPredicate(CompoundPredicate.Operator.AND, greater, less));
     }
 
     @Test
@@ -115,15 +116,15 @@ public class S3TvfLoadStmtTest {
         final TableRef tvfRef = Deencapsulation.invoke(S3TvfLoadStmt.class,
                 "buildTvfRef",
                 dataDescription, brokerDesc);
-        Assert.assertTrue(tvfRef instanceof TableValuedFunctionRef);
+        Assertions.assertTrue(tvfRef instanceof TableValuedFunctionRef);
         final S3TableValuedFunction tableFunction
                 = (S3TableValuedFunction) ((TableValuedFunctionRef) 
tvfRef).getTableFunction();
         final Map<String, String> locationProperties = 
tableFunction.getLocationProperties();
-        Assert.assertEquals(locationProperties.get(S3Properties.ENDPOINT), 
ENDPOINT_VALUE);
-        Assert.assertEquals(locationProperties.get(S3Properties.ACCESS_KEY), 
ACCESS_KEY_VALUE);
-        Assert.assertEquals(locationProperties.get(S3Properties.SECRET_KEY), 
SECRET_KEY_VALUE);
-        Assert.assertEquals(locationProperties.get(S3Properties.REGION), 
REGION_VALUE);
-        Assert.assertEquals(tableFunction.getFilePath(), DATA_URI);
+        Assertions.assertEquals(locationProperties.get(S3Properties.ENDPOINT), 
ENDPOINT_VALUE);
+        
Assertions.assertEquals(locationProperties.get(S3Properties.ACCESS_KEY), 
ACCESS_KEY_VALUE);
+        
Assertions.assertEquals(locationProperties.get(S3Properties.SECRET_KEY), 
SECRET_KEY_VALUE);
+        Assertions.assertEquals(locationProperties.get(S3Properties.REGION), 
REGION_VALUE);
+        Assertions.assertEquals(tableFunction.getFilePath(), DATA_URI);
     }
 
     @Injectable
@@ -176,13 +177,13 @@ public class S3TvfLoadStmtTest {
         Deencapsulation.setField(s3TvfLoadStmt, "functionGenTableColNames", 
Sets.newHashSet("c1", "c2", "c3"));
 
         Deencapsulation.invoke(s3TvfLoadStmt, "rewriteExpr", columnsDescList);
-        Assert.assertEquals(columnsDescList.size(), 5);
+        Assertions.assertEquals(columnsDescList.size(), 5);
         final String orig4 = "((upper(`c1`) + 1) + 1)";
-        Assert.assertEquals(orig4, 
columnsDescList.get(4).getExpr().toString());
+        Assertions.assertEquals(orig4, 
columnsDescList.get(4).getExpr().toString());
 
         final List<ImportColumnDesc> filterColumns = 
Deencapsulation.invoke(s3TvfLoadStmt,
                 "filterColumns", columnsDescList);
-        Assert.assertEquals(filterColumns.size(), 4);
+        Assertions.assertEquals(filterColumns.size(), 4);
     }
 
     private static DataDescription buildDataDesc(Iterable<String> columns, 
Expr fileFilter, Expr wherePredicate,
@@ -210,11 +211,12 @@ public class S3TvfLoadStmtTest {
     private static List<ImportColumnDesc> getColumnsDescList(String columns) 
throws Exception {
         String columnsSQL = "COLUMNS (" + columns + ")";
         return ((ImportColumnsStmt) SqlParserUtils.getFirstStmt(
-                new SqlParser(new SqlScanner(new 
StringReader(columnsSQL))))).getColumns();
+                new org.apache.doris.analysis.SqlParser(
+                        new org.apache.doris.analysis.SqlScanner(new 
StringReader(columnsSQL))))).getColumns();
     }
 
     private static List<Column> getBaseSchema() {
-        List<Column> columns = com.google.common.collect.Lists.newArrayList();
+        List<Column> columns = Lists.newArrayList();
 
         Column c1 = new Column("c1", PrimitiveType.BIGINT);
         c1.setIsKey(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to