This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 677b3424e3 [feature](Load)(step1)support nereids load, add load grammar (#23485) 677b3424e3 is described below commit 677b3424e3b24793a8cd2af54db9e1cc2f8b933a Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Wed Sep 20 21:12:23 2023 +0800 [feature](Load)(step1)support nereids load, add load grammar (#23485) support nereids load grammar. we will convert the broker load stmt to insert into clause: 1. rename broker load to bulk load. 2. add load grammar to nereids optimizer. 3. convert to insert into clause with table value function. https://github.com/apache/doris/issues/24221 --- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 86 ++- .../java/org/apache/doris/common/FeConstants.java | 2 + .../doris/nereids/parser/LogicalPlanBuilder.java | 89 ++- .../apache/doris/nereids/trees/plans/PlanType.java | 5 +- .../nereids/trees/plans/commands/LoadCommand.java | 495 ++++++++++++++++ .../plans/commands/info/BulkLoadDataDesc.java | 333 +++++++++++ .../trees/plans/commands/info/BulkStorageDesc.java | 123 ++++ .../trees/plans/visitor/CommandVisitor.java | 5 + .../ExternalFileTableValuedFunction.java | 12 +- .../doris/analysis/BulkLoadDataDescTest.java | 658 +++++++++++++++++++++ .../apache/doris/analysis/S3TvfLoadStmtTest.java | 36 +- 11 files changed, 1818 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 335576660f..b0a11bcb28 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -63,6 +63,19 @@ statement (PARTITION partition=identifierList)? (USING relation (COMMA relation)*) whereClause #delete + | LOAD LABEL lableName=identifier + LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN + (withRemoteStorageSystem)? + (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? + (commentSpec)? #load + | LOAD LABEL lableName=identifier + LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN + resourceDesc + (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? + (commentSpec)? #resourceLoad + | LOAD mysqlDataDesc + (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? + (commentSpec)? #mysqlLoad | EXPORT TABLE tableName=multipartIdentifier (PARTITION partition=identifierList)? (whereClause)? @@ -71,7 +84,29 @@ statement (withRemoteStorageSystem)? #export ; - +dataDesc + : ((WITH)? mergeType)? DATA INFILE LEFT_PAREN filePaths+=STRING_LITERAL (COMMA filePath+=STRING_LITERAL)* RIGHT_PAREN + INTO TABLE tableName=multipartIdentifier + (PARTITION partition=identifierList)? + (COLUMNS TERMINATED BY comma=STRING_LITERAL)? + (LINES TERMINATED BY separator=STRING_LITERAL)? + (FORMAT AS format=identifier)? + (columns=identifierList)? + (columnsFromPath=colFromPath)? + (columnMapping=colMappingList)? + (preFilter=preFilterClause)? + (where=whereClause)? + (deleteOn=deleteOnClause)? + (sequenceColumn=sequenceColClause)? + (propertyClause)? + | ((WITH)? mergeType)? DATA FROM TABLE tableName=multipartIdentifier + INTO TABLE tableName=multipartIdentifier + (PARTITION partition=identifierList)? + (columnMapping=colMappingList)? + (where=whereClause)? + (deleteOn=deleteOnClause)? + (propertyClause)? + ; // -----------------Command accessories----------------- @@ -101,6 +136,36 @@ planType | ALL // default type ; +mergeType + : APPEND + | DELETE + | MERGE + ; + +preFilterClause + : PRECEDING FILTER expression + ; + +deleteOnClause + : DELETE ON expression + ; + +sequenceColClause + : ORDER BY identifier + ; + +colFromPath + : COLUMNS FROM PATH AS identifierList + ; + +colMappingList + : SET LEFT_PAREN mappingSet+=mappingExpr (COMMA mappingSet+=mappingExpr)* RIGHT_PAREN + ; + +mappingExpr + : (mappingCol=identifier EQ expression) + ; + withRemoteStorageSystem : WITH S3 LEFT_PAREN brokerProperties=propertyItemList @@ -117,6 +182,25 @@ withRemoteStorageSystem RIGHT_PAREN)? ; +resourceDesc + : WITH RESOURCE resourceName=identifierOrText (LEFT_PAREN propertyItemList RIGHT_PAREN)? + ; + +mysqlDataDesc + : DATA (LOCAL booleanValue)? + INFILE filePath=STRING_LITERAL + INTO TABLE tableName=multipartIdentifier + (PARTITION partition=identifierList)? + (COLUMNS TERMINATED BY comma=STRING_LITERAL)? + (LINES TERMINATED BY separator=STRING_LITERAL)? + (skipLines)? + (columns=identifierList)? + (colMappingList)? + (propertyClause)? + ; + +skipLines : IGNORE lines=INTEGER_VALUE LINES | IGNORE lines=INTEGER_VALUE ROWS ; + // -----------------Query----------------- // add queryOrganization for parse (q1) union (q2) union (q3) order by keys, otherwise 'order' will be recognized to be // identifier. diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 0845d593e2..ecd0c2f4fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -49,6 +49,8 @@ public class FeConstants { // set to true to skip some step when running FE unit test public static boolean runningUnitTest = false; + // use to set some mocked values for FE unit test + public static Object unitTestConstant = null; // set to false to disable internal schema db public static boolean enableInternalSchemaDb = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index a9b50c8bf1..75d7c788c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -20,13 +20,13 @@ package org.apache.doris.nereids.parser; import org.apache.doris.analysis.ArithmeticExpr.Operator; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.StorageBackend; -import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.KeysType; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.nereids.DorisParser; import org.apache.doris.nereids.DorisParser.AggClauseContext; import org.apache.doris.nereids.DorisParser.AliasQueryContext; @@ -291,7 +291,10 @@ import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.commands.ExportCommand; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.LoadCommand; import org.apache.doris.nereids.trees.plans.commands.UpdateCommand; +import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; +import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; import org.apache.doris.nereids.trees.plans.commands.info.DefaultValue; @@ -330,6 +333,7 @@ import org.apache.doris.nereids.types.StructField; import org.apache.doris.nereids.types.StructType; import org.apache.doris.nereids.types.coercion.CharacterType; import org.apache.doris.nereids.util.ExpressionUtils; +import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.policy.FilterType; import org.apache.doris.policy.PolicyTypeEnum; import org.apache.doris.qe.ConnectContext; @@ -351,6 +355,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -464,6 +469,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { @Override public LogicalPlan visitExport(ExportContext ctx) { + // TODO: replace old class name like ExportStmt, BrokerDesc, Expr with new nereid class name List<String> tableName = visitMultipartIdentifier(ctx.tableName); List<String> partitions = ctx.partition == null ? ImmutableList.of() : visitIdentifierList(ctx.partition); @@ -515,7 +521,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { } else if (ctx.HDFS() != null) { brokerDesc = new BrokerDesc("HDFS", StorageBackend.StorageType.HDFS, brokerPropertiesMap); } else if (ctx.LOCAL() != null) { - brokerDesc = new BrokerDesc("HDFS", StorageType.LOCAL, brokerPropertiesMap); + brokerDesc = new BrokerDesc("HDFS", StorageBackend.StorageType.LOCAL, brokerPropertiesMap); } else if (ctx.BROKER() != null) { brokerDesc = new BrokerDesc(visitIdentifierOrText(ctx.brokerName), brokerPropertiesMap); } @@ -541,6 +547,85 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { return logicalPlans; } + /** + * Visit load-statements. + */ + @Override + public LogicalPlan visitLoad(DorisParser.LoadContext ctx) { + + BulkStorageDesc bulkDesc = null; + if (ctx.withRemoteStorageSystem() != null) { + Map<String, String> bulkProperties = + new HashMap<>(visitPropertyItemList(ctx.withRemoteStorageSystem().brokerProperties)); + if (ctx.withRemoteStorageSystem().S3() != null) { + bulkDesc = new BulkStorageDesc("S3", BulkStorageDesc.StorageType.S3, bulkProperties); + } else if (ctx.withRemoteStorageSystem().HDFS() != null) { + bulkDesc = new BulkStorageDesc("HDFS", BulkStorageDesc.StorageType.HDFS, bulkProperties); + } else if (ctx.withRemoteStorageSystem().LOCAL() != null) { + bulkDesc = new BulkStorageDesc("LOCAL_HDFS", BulkStorageDesc.StorageType.LOCAL, bulkProperties); + } else if (ctx.withRemoteStorageSystem().BROKER() != null + && ctx.withRemoteStorageSystem().identifierOrText().getText() != null) { + bulkDesc = new BulkStorageDesc(ctx.withRemoteStorageSystem().identifierOrText().getText(), + bulkProperties); + } + } + ImmutableList.Builder<BulkLoadDataDesc> dataDescriptions = new ImmutableList.Builder<>(); + for (DorisParser.DataDescContext ddc : ctx.dataDescs) { + List<String> tableName = RelationUtil.getQualifierName(ConnectContext.get(), + visitMultipartIdentifier(ddc.tableName)); + List<String> colNames = (ddc.columns == null ? ImmutableList.of() : visitIdentifierList(ddc.columns)); + List<String> columnsFromPath = (ddc.columnsFromPath == null ? ImmutableList.of() + : visitIdentifierList(ddc.columnsFromPath.identifierList())); + List<String> partitions = ddc.partition == null ? ImmutableList.of() : visitIdentifierList(ddc.partition); + // TODO: multi location + List<String> multiFilePaths = new ArrayList<>(); + for (Token filePath : ddc.filePaths) { + multiFilePaths.add(filePath.getText().substring(1, filePath.getText().length() - 1)); + } + List<String> filePaths = ddc.filePath == null ? ImmutableList.of() : multiFilePaths; + Map<String, Expression> colMappings; + if (ddc.columnMapping == null) { + colMappings = ImmutableMap.of(); + } else { + colMappings = new HashMap<>(); + for (DorisParser.MappingExprContext mappingExpr : ddc.columnMapping.mappingSet) { + colMappings.put(mappingExpr.mappingCol.getText(), getExpression(mappingExpr.expression())); + } + } + + LoadTask.MergeType mergeType = ddc.mergeType() == null ? LoadTask.MergeType.APPEND + : LoadTask.MergeType.valueOf(ddc.mergeType().getText()); + + Optional<String> fileFormat = ddc.format == null ? Optional.empty() : Optional.of(ddc.format.getText()); + Optional<String> separator = ddc.separator == null ? Optional.empty() : Optional.of(ddc.separator.getText() + .substring(1, ddc.separator.getText().length() - 1)); + Optional<String> comma = ddc.comma == null ? Optional.empty() : Optional.of(ddc.comma.getText() + .substring(1, ddc.comma.getText().length() - 1)); + Map<String, String> dataProperties = ddc.propertyClause() == null ? new HashMap<>() + : visitPropertyClause(ddc.propertyClause()); + dataDescriptions.add(new BulkLoadDataDesc( + tableName, + partitions, + filePaths, + colNames, + columnsFromPath, + colMappings, + new BulkLoadDataDesc.FileFormatDesc(separator, comma, fileFormat), + false, + ddc.preFilter == null ? Optional.empty() : Optional.of(getExpression(ddc.preFilter.expression())), + ddc.where == null ? Optional.empty() : Optional.of(getExpression(ddc.where.booleanExpression())), + mergeType, + ddc.deleteOn == null ? Optional.empty() : Optional.of(getExpression(ddc.deleteOn.expression())), + ddc.sequenceColumn == null ? Optional.empty() + : Optional.of(ddc.sequenceColumn.identifier().getText()), dataProperties)); + } + String labelName = ctx.lableName.getText(); + Map<String, String> properties = visitPropertyItemList(ctx.properties); + String commentSpec = ctx.commentSpec() == null ? "" : ctx.commentSpec().STRING_LITERAL().getText(); + String comment = escapeBackSlash(commentSpec.substring(1, commentSpec.length() - 1)); + return new LoadCommand(labelName, dataDescriptions.build(), bulkDesc, properties, comment); + } + /* ******************************************************************************************** * Plan parsing * ******************************************************************************************** */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 31c3641620..c1ed61ce1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -119,8 +119,9 @@ public enum PlanType { CREATE_TABLE_COMMAND, DELETE_COMMAND, EXPLAIN_COMMAND, + EXPORT_COMMAND, INSERT_INTO_TABLE_COMMAND, + LOAD_COMMAND, SELECT_INTO_OUTFILE_COMMAND, - UPDATE_COMMAND, - EXPORT_COMMAND + UPDATE_COMMAND } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java new file mode 100644 index 0000000000..f02c5bd2d8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -0,0 +1,495 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.NereidsException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.Profile; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.nereids.analyzer.UnboundAlias; +import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.analyzer.UnboundStar; +import org.apache.doris.nereids.analyzer.UnboundTVFRelation; +import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.expressions.functions.scalar.If; +import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; +import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; +import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.ExpressionUtils; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryStateException; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.tablefunction.HdfsTableValuedFunction; +import org.apache.doris.tablefunction.S3TableValuedFunction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * load OLAP table data from external bulk file + */ +public class LoadCommand extends Command implements ForwardWithSync { + + public static final Logger LOG = LogManager.getLogger(LoadCommand.class); + + private final String labelName; + private final BulkStorageDesc bulkStorageDesc; + private final List<BulkLoadDataDesc> sourceInfos; + private final Map<String, String> properties; + private final String comment; + private final List<LogicalPlan> plans = new ArrayList<>(); + private Profile profile; + + /** + * constructor of ExportCommand + */ + public LoadCommand(String labelName, List<BulkLoadDataDesc> sourceInfos, BulkStorageDesc bulkStorageDesc, + Map<String, String> properties, String comment) { + super(PlanType.LOAD_COMMAND); + this.labelName = Objects.requireNonNull(labelName.trim(), "labelName should not null"); + this.sourceInfos = Objects.requireNonNull(ImmutableList.copyOf(sourceInfos), "sourceInfos should not null"); + this.properties = Objects.requireNonNull(ImmutableMap.copyOf(properties), "properties should not null"); + this.bulkStorageDesc = Objects.requireNonNull(bulkStorageDesc, "bulkStorageDesc should not null"); + this.comment = Objects.requireNonNull(comment, "comment should not null"); + } + + /** + * for test print + * + * @param ctx context + * @return parsed insert into plan + */ + @VisibleForTesting + public List<LogicalPlan> parseToInsertIntoPlan(ConnectContext ctx) throws AnalysisException { + List<LogicalPlan> plans = new ArrayList<>(); + for (BulkLoadDataDesc dataDesc : sourceInfos) { + plans.add(completeQueryPlan(ctx, dataDesc)); + } + return plans; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + // TODO: begin txn form multi insert sql + /* this.profile = new Profile("Query", ctx.getSessionVariable().enableProfile); + profile.getSummaryProfile().setQueryBeginTime(); + for (BulkLoadDataDesc dataDesc : sourceInfos) { + plans.add(new InsertIntoTableCommand(completeQueryPlan(ctx, dataDesc), Optional.of(labelName), false)); + } + profile.getSummaryProfile().setQueryPlanFinishTime(); + * executeInsertStmtPlan(ctx, executor, plans); */ + throw new AnalysisException("Fallback to legacy planner temporary."); + } + + private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataDesc) + throws AnalysisException { + if (LOG.isDebugEnabled()) { + LOG.debug("nereids load stmt before conversion: {}", dataDesc::toSql); + } + // 1. build source projects plan (select col1,col2... from tvf where prefilter) + Map<String, String> tvfProperties = getTvfProperties(dataDesc, bulkStorageDesc); + LogicalPlan tvfLogicalPlan = new LogicalCheckPolicy<>(getUnboundTVFRelation(tvfProperties)); + tvfLogicalPlan = buildTvfQueryPlan(dataDesc, tvfProperties, tvfLogicalPlan); + + if (!(tvfLogicalPlan instanceof LogicalProject)) { + throw new AnalysisException("Fail to build TVF query, TVF query should be LogicalProject"); + } + List<NamedExpression> tvfProjects = ((LogicalProject<?>) tvfLogicalPlan).getProjects(); + // tvfProjects may be '*' or 'col1,col2,...' + if (tvfProjects.isEmpty()) { + throw new AnalysisException("Fail to build TVF query, parsed TVF select list requires not null"); + } + boolean scanAllTvfCol = (tvfProjects.get(0) instanceof UnboundStar); + + OlapTable olapTable = getOlapTable(ctx, dataDesc); + List<Column> olapSchema = olapTable.getBaseSchema(); + // map column index to mapping expr + Map<String, Expression> mappingExpressions = dataDesc.getColumnMappings(); + // 2. build sink where + Set<Expression> conjuncts = new HashSet<>(); + if (dataDesc.getWhereExpr().isPresent()) { + Set<Expression> whereParts = ExpressionUtils.extractConjunctionToSet(dataDesc.getWhereExpr().get()); + for (Expression wherePart : whereParts) { + if (!(wherePart instanceof ComparisonPredicate)) { + throw new AnalysisException("WHERE clause must be comparison expression"); + } + ComparisonPredicate comparison = ((ComparisonPredicate) wherePart); + if (!(comparison.left() instanceof UnboundSlot)) { + throw new AnalysisException("Invalid predicate column " + comparison.left().toSql()); + } + conjuncts.add(comparison.rewriteUp(e -> { + if (!(e instanceof UnboundSlot)) { + return e; + } + UnboundSlot slot = (UnboundSlot) e; + String colName = getUnquotedName(slot); + return mappingExpressions.getOrDefault(colName, e); + })); + } + } + + if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties) && !conjuncts.isEmpty()) { + throw new AnalysisException("Required property 'csv_schema' for csv file, " + + "when no column list specified and use WHERE"); + } + tvfLogicalPlan = new LogicalFilter<>(conjuncts, tvfLogicalPlan); + + // 3. build sink project + List<String> sinkCols = new ArrayList<>(); + List<NamedExpression> selectLists = new ArrayList<>(); + List<String> olapColumns = olapSchema.stream().map(Column::getDisplayName).collect(Collectors.toList()); + if (!scanAllTvfCol) { + int numSinkCol = Math.min(tvfProjects.size(), olapColumns.size()); + // if not scan all tvf column, try to treat each tvfColumn as olapColumn + for (int i = 0; i < numSinkCol; i++) { + UnboundSlot sourceCol = (UnboundSlot) tvfProjects.get(i); + // check sourceCol is slot and check olapColumn beyond index. + String olapColumn = olapColumns.get(i); + fillSinkBySourceCols(mappingExpressions, olapColumn, + sourceCol, sinkCols, selectLists); + } + fillDeleteOnColumn(dataDesc, olapTable, sinkCols, selectLists, Column.DELETE_SIGN); + } else { + for (String olapColumn : olapColumns) { + if (olapColumn.equalsIgnoreCase(Column.VERSION_COL) + || olapColumn.equalsIgnoreCase(Column.SEQUENCE_COL)) { + continue; + } + if (olapColumn.equalsIgnoreCase(Column.DELETE_SIGN)) { + fillDeleteOnColumn(dataDesc, olapTable, sinkCols, selectLists, olapColumn); + continue; + } + fillSinkBySourceCols(mappingExpressions, olapColumn, new UnboundSlot(olapColumn), + sinkCols, selectLists); + } + } + if (sinkCols.isEmpty() && selectLists.isEmpty()) { + // build 'insert into tgt_tbl select * from src_tbl' + selectLists.add(new UnboundStar(new ArrayList<>())); + } + for (String columnFromPath : dataDesc.getColumnsFromPath()) { + sinkCols.add(columnFromPath); + // columnFromPath will be parsed by BE, put columns as placeholder. + selectLists.add(new UnboundSlot(columnFromPath)); + } + + tvfLogicalPlan = new LogicalProject<>(selectLists, tvfLogicalPlan); + checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, selectLists); + boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite() + && sinkCols.size() < olapTable.getColumns().size(); + return new UnboundOlapTableSink<>(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), + dataDesc.getPartitionNames(), isPartialUpdate, tvfLogicalPlan); + } + + private static void fillDeleteOnColumn(BulkLoadDataDesc dataDesc, OlapTable olapTable, + List<String> sinkCols, + List<NamedExpression> selectLists, + String olapColumn) throws AnalysisException { + if (olapTable.hasDeleteSign() && dataDesc.getDeleteCondition().isPresent()) { + checkDeleteOnConditions(dataDesc.getMergeType(), dataDesc.getDeleteCondition().get()); + Optional<If> deleteIf = createDeleteOnIfCall(olapTable, olapColumn, dataDesc); + if (deleteIf.isPresent()) { + sinkCols.add(olapColumn); + selectLists.add(new UnboundAlias(deleteIf.get(), olapColumn)); + } + sinkCols.add(olapColumn); + } + } + + /** + * use to get unquoted column name + * @return unquoted slot name + */ + public static String getUnquotedName(NamedExpression slot) { + if (slot instanceof UnboundAlias) { + return slot.getName(); + } else if (slot instanceof UnboundSlot) { + List<String> slotNameParts = ((UnboundSlot) slot).getNameParts(); + return slotNameParts.get(slotNameParts.size() - 1); + } + return slot.getName(); + } + + private static void fillSinkBySourceCols(Map<String, Expression> mappingExpressions, + String olapColumn, UnboundSlot tvfColumn, + List<String> sinkCols, List<NamedExpression> selectLists) { + sinkCols.add(olapColumn); + if (mappingExpressions.containsKey(olapColumn)) { + selectLists.add(new UnboundAlias(mappingExpressions.get(olapColumn), olapColumn)); + } else { + selectLists.add(new UnboundAlias(tvfColumn, olapColumn)); + } + } + + private static boolean isCsvType(Map<String, String> tvfProperties) { + return tvfProperties.get(ExternalFileTableValuedFunction.FORMAT).equalsIgnoreCase("csv"); + } + + /** + * fill all column that need to be loaded to sinkCols. + * fill the map with sink columns and generated source columns. + * sink columns use for 'INSERT INTO' + * generated source columns use for 'SELECT' + * + * @param dataDesc dataDesc + * @param tvfProperties generated tvfProperties + * @param tvfLogicalPlan source tvf relation + */ + private static LogicalPlan buildTvfQueryPlan(BulkLoadDataDesc dataDesc, + Map<String, String> tvfProperties, + LogicalPlan tvfLogicalPlan) throws AnalysisException { + // build tvf column filter + if (dataDesc.getPrecedingFilterExpr().isPresent()) { + Set<Expression> preConjuncts = + ExpressionUtils.extractConjunctionToSet(dataDesc.getPrecedingFilterExpr().get()); + if (!preConjuncts.isEmpty()) { + tvfLogicalPlan = new LogicalFilter<>(preConjuncts, tvfLogicalPlan); + } + } + + Map<String, String> sourceProperties = dataDesc.getProperties(); + if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties)) { + String csvSchemaStr = sourceProperties.get(ExternalFileTableValuedFunction.CSV_SCHEMA); + if (csvSchemaStr != null) { + tvfProperties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, csvSchemaStr); + List<Column> csvSchema = new ArrayList<>(); + ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, sourceProperties); + List<NamedExpression> csvColumns = new ArrayList<>(); + for (Column csvColumn : csvSchema) { + csvColumns.add(new UnboundSlot(csvColumn.getName())); + } + if (!csvColumns.isEmpty()) { + for (String columnFromPath : dataDesc.getColumnsFromPath()) { + csvColumns.add(new UnboundSlot(columnFromPath)); + } + return new LogicalProject<>(csvColumns, tvfLogicalPlan); + } + if (!dataDesc.getPrecedingFilterExpr().isPresent()) { + throw new AnalysisException("Required property 'csv_schema' for csv file, " + + "when no column list specified and use PRECEDING FILTER"); + } + } + return getStarProjectPlan(tvfLogicalPlan); + } + List<NamedExpression> dataDescColumns = new ArrayList<>(); + for (int i = 0; i < dataDesc.getFileFieldNames().size(); i++) { + String sourceColumn = dataDesc.getFileFieldNames().get(i); + dataDescColumns.add(new UnboundSlot(sourceColumn)); + } + if (dataDescColumns.isEmpty()) { + return getStarProjectPlan(tvfLogicalPlan); + } else { + return new LogicalProject<>(dataDescColumns, tvfLogicalPlan); + } + } + + private static LogicalProject<LogicalPlan> getStarProjectPlan(LogicalPlan logicalPlan) { + return new LogicalProject<>(ImmutableList.of(new UnboundStar(new ArrayList<>())), logicalPlan); + } + + private static Optional<If> createDeleteOnIfCall(OlapTable olapTable, String olapColName, + BulkLoadDataDesc dataDesc) throws AnalysisException { + if (olapTable.hasDeleteSign() + && dataDesc.getDeleteCondition().isPresent()) { + if (!(dataDesc.getDeleteCondition().get() instanceof ComparisonPredicate)) { + throw new AnalysisException("DELETE ON clause must be comparison expression."); + } + ComparisonPredicate deleteOn = (ComparisonPredicate) dataDesc.getDeleteCondition().get(); + Expression deleteOnCol = deleteOn.left(); + if (!(deleteOnCol instanceof UnboundSlot)) { + throw new AnalysisException("DELETE ON column must be an undecorated OLAP column."); + } + if (!olapColName.equalsIgnoreCase(getUnquotedName((UnboundSlot) deleteOnCol))) { + return Optional.empty(); + } + If deleteIf = new If(deleteOn, new TinyIntLiteral((byte) 1), new TinyIntLiteral((byte) 0)); + return Optional.of(deleteIf); + } else { + return Optional.empty(); + } + } + + private static void checkDeleteOnConditions(LoadTask.MergeType mergeType, Expression deleteCondition) + throws AnalysisException { + if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) { + throw new AnalysisException(BulkLoadDataDesc.EXPECT_MERGE_DELETE_ON); + } + if (mergeType == LoadTask.MergeType.MERGE && deleteCondition == null) { + throw new AnalysisException(BulkLoadDataDesc.EXPECT_DELETE_ON); + } + } + + private static void checkAndAddSequenceCol(OlapTable olapTable, BulkLoadDataDesc dataDesc, + List<String> sinkCols, List<NamedExpression> selectLists) + throws AnalysisException { + Optional<String> optSequenceCol = dataDesc.getSequenceCol(); + if (!optSequenceCol.isPresent() && !olapTable.hasSequenceCol()) { + return; + } + // check olapTable schema and sequenceCol + if (olapTable.hasSequenceCol() && !optSequenceCol.isPresent()) { + throw new AnalysisException("Table " + olapTable.getName() + + " has sequence column, need to specify the sequence column"); + } + if (optSequenceCol.isPresent() && !olapTable.hasSequenceCol()) { + throw new AnalysisException("There is no sequence column in the table " + olapTable.getName()); + } + String sequenceCol = dataDesc.getSequenceCol().get(); + // check source sequence column is in parsedColumnExprList or Table base schema + boolean hasSourceSequenceCol = false; + if (!sinkCols.isEmpty()) { + List<String> allCols = new ArrayList<>(dataDesc.getFileFieldNames()); + allCols.addAll(sinkCols); + for (String sinkCol : allCols) { + if (sinkCol.equals(sequenceCol)) { + hasSourceSequenceCol = true; + break; + } + } + } + List<Column> columns = olapTable.getBaseSchema(); + for (Column column : columns) { + if (column.getName().equals(sequenceCol)) { + hasSourceSequenceCol = true; + break; + } + } + if (!hasSourceSequenceCol) { + throw new AnalysisException("There is no sequence column " + sequenceCol + " in the " + olapTable.getName() + + " or the COLUMNS and SET clause"); + } else { + sinkCols.add(Column.SEQUENCE_COL); + selectLists.add(new UnboundAlias(new UnboundSlot(sequenceCol), Column.SEQUENCE_COL)); + } + } + + private UnboundTVFRelation getUnboundTVFRelation(Map<String, String> properties) { + UnboundTVFRelation relation; + if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) { + relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(), + S3TableValuedFunction.NAME, new Properties(properties)); + } else if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.HDFS) { + relation = new UnboundTVFRelation(StatementScopeIdGenerator.newRelationId(), + HdfsTableValuedFunction.NAME, new Properties(properties)); + } else { + throw new UnsupportedOperationException("Unsupported load storage type: " + + bulkStorageDesc.getStorageType()); + } + return relation; + } + + private static OlapTable getOlapTable(ConnectContext ctx, BulkLoadDataDesc dataDesc) throws AnalysisException { + OlapTable targetTable; + TableIf table = RelationUtil.getTable(dataDesc.getNameParts(), ctx.getEnv()); + if (!(table instanceof OlapTable)) { + throw new AnalysisException("table must be olapTable in load command"); + } + targetTable = ((OlapTable) table); + return targetTable; + } + + private static Map<String, String> getTvfProperties(BulkLoadDataDesc dataDesc, BulkStorageDesc bulkStorageDesc) { + Map<String, String> tvfProperties = new HashMap<>(bulkStorageDesc.getProperties()); + String fileFormat = dataDesc.getFormatDesc().getFileFormat().orElse("csv"); + if ("csv".equalsIgnoreCase(fileFormat)) { + dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep -> + tvfProperties.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, sep.getSeparator())); + dataDesc.getFormatDesc().getLineDelimiter().ifPresent(sep -> + tvfProperties.put(ExternalFileTableValuedFunction.LINE_DELIMITER, sep.getSeparator())); + } + // TODO: resolve and put ExternalFileTableValuedFunction params + tvfProperties.put(ExternalFileTableValuedFunction.FORMAT, fileFormat); + + List<String> filePaths = dataDesc.getFilePaths(); + // TODO: support multi location by union + String listFilePath = filePaths.get(0); + if (bulkStorageDesc.getStorageType() == BulkStorageDesc.StorageType.S3) { + S3Properties.convertToStdProperties(tvfProperties); + tvfProperties.keySet().removeIf(S3Properties.Env.FS_KEYS::contains); + // TODO: check file path by s3 fs list status + tvfProperties.put(S3TableValuedFunction.S3_URI, listFilePath); + } + + final Map<String, String> dataDescProps = dataDesc.getProperties(); + if (dataDescProps != null) { + tvfProperties.putAll(dataDescProps); + } + List<String> columnsFromPath = dataDesc.getColumnsFromPath(); + if (columnsFromPath != null && !columnsFromPath.isEmpty()) { + tvfProperties.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS, + String.join(",", columnsFromPath)); + } + return tvfProperties; + } + + private void executeInsertStmtPlan(ConnectContext ctx, StmtExecutor executor, List<InsertIntoTableCommand> plans) { + try { + for (LogicalPlan logicalPlan : plans) { + ((Command) logicalPlan).run(ctx, executor); + } + } catch (QueryStateException e) { + ctx.setState(e.getQueryState()); + throw new NereidsException("Command process failed", new AnalysisException(e.getMessage(), e)); + } catch (UserException e) { + // Return message to info client what happened. + ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + throw new NereidsException("Command process failed", new AnalysisException(e.getMessage(), e)); + } catch (Exception e) { + // Maybe our bug + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, e.getMessage()); + throw new NereidsException("Command process failed.", new AnalysisException(e.getMessage(), e)); + } + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitLoadCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java new file mode 100644 index 0000000000..e0365ca098 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkLoadDataDesc.java @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.analysis.Separator; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.nereids.trees.expressions.Expression; + +import com.google.common.base.Joiner; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * used to describe data info which is needed to import. + * The transform of columns should be added after the keyword named COLUMNS. + * The transform after the keyword named SET is the old ways which only supports the hadoop function. + * It old way of transform will be removed gradually. + * data_desc: + * DATA INFILE ('file_path', ...) + * [NEGATIVE] + * INTO TABLE tbl_name + * [PARTITION (p1, p2)] + * [COLUMNS TERMINATED BY separator] + * [FORMAT AS format] + * [(tmp_col1, tmp_col2, col3, ...)] + * [COLUMNS FROM PATH AS (col1, ...)] + * [SET (k1=f1(xx), k2=f2(xxx))] + * [where_clause] + * DATA FROM TABLE external_hive_tbl_name + * [NEGATIVE] + * INTO TABLE tbl_name + * [PARTITION (p1, p2)] + * [SET (k1=f1(xx), k2=f2(xxx))] + * [where_clause] + */ +public class BulkLoadDataDesc { + + public static final String EXPECT_MERGE_DELETE_ON = "not support DELETE ON clause when merge type is not MERGE."; + public static final String EXPECT_DELETE_ON = "Excepted DELETE ON clause when merge type is MERGE."; + private static final Logger LOG = LogManager.getLogger(BulkLoadDataDesc.class); + + private final List<String> nameParts; + private final String tableName; + private final String dbName; + private final List<String> partitionNames; + private final List<String> filePaths; + private final boolean isNegative; + // column names in the path + private final List<String> columnsFromPath; + // save column mapping in SET(xxx = xxx) clause + private final Map<String, Expression> columnMappings; + private final Optional<Expression> precedingFilterExpr; + private final Optional<Expression> whereExpr; + private final LoadTask.MergeType mergeType; + private final String srcTableName; + // column names of source files + private final List<String> fileFieldNames; + private final Optional<String> sequenceCol; + private final FileFormatDesc formatDesc; + // Merged from fileFieldNames, columnsFromPath and columnMappingList + // ImportColumnDesc: column name to (expr or null) + private final Optional<Expression> deleteCondition; + private final Map<String, String> dataProperties; + private boolean isMysqlLoad = false; + + /** + * bulk load desc + */ + public BulkLoadDataDesc(List<String> fullTableName, + List<String> partitionNames, + List<String> filePaths, + List<String> columns, + List<String> columnsFromPath, + Map<String, Expression> columnMappings, + FileFormatDesc formatDesc, + boolean isNegative, + Optional<Expression> fileFilterExpr, + Optional<Expression> whereExpr, + LoadTask.MergeType mergeType, + Optional<Expression> deleteCondition, + Optional<String> sequenceColName, + Map<String, String> dataProperties) { + this.nameParts = Objects.requireNonNull(fullTableName, "nameParts should not null"); + this.dbName = Objects.requireNonNull(fullTableName.get(1), "dbName should not null"); + this.tableName = Objects.requireNonNull(fullTableName.get(2), "tableName should not null"); + this.partitionNames = Objects.requireNonNull(partitionNames, "partitionNames should not null"); + this.filePaths = Objects.requireNonNull(filePaths, "filePaths should not null"); + this.formatDesc = Objects.requireNonNull(formatDesc, "formatDesc should not null"); + this.fileFieldNames = columnsNameToLowerCase(Objects.requireNonNull(columns, "columns should not null")); + this.columnsFromPath = columnsNameToLowerCase(columnsFromPath); + this.isNegative = isNegative; + this.columnMappings = columnMappings; + this.precedingFilterExpr = fileFilterExpr; + this.whereExpr = whereExpr; + this.mergeType = mergeType; + // maybe from tvf or table + this.srcTableName = null; + this.deleteCondition = deleteCondition; + this.sequenceCol = sequenceColName; + this.dataProperties = dataProperties; + } + + /** + * bulk load file format desc + */ + public static class FileFormatDesc { + private final Separator lineDelimiter; + private final Separator columnSeparator; + private final String fileFormat; + + public FileFormatDesc(Optional<String> fileFormat) { + this(Optional.empty(), Optional.empty(), fileFormat); + } + + public FileFormatDesc(Optional<String> lineDelimiter, Optional<String> columnSeparator) { + this(lineDelimiter, columnSeparator, Optional.empty()); + } + + /** + * build bulk load format desc and check valid + * @param lineDelimiter text format line delimiter + * @param columnSeparator text format column separator + * @param fileFormat file format + */ + public FileFormatDesc(Optional<String> lineDelimiter, Optional<String> columnSeparator, + Optional<String> fileFormat) { + this.lineDelimiter = new Separator(lineDelimiter.orElse(null)); + this.columnSeparator = new Separator(columnSeparator.orElse(null)); + try { + if (!StringUtils.isEmpty(this.lineDelimiter.getOriSeparator())) { + this.lineDelimiter.analyze(); + } + if (!StringUtils.isEmpty(this.columnSeparator.getOriSeparator())) { + this.columnSeparator.analyze(); + } + } catch (AnalysisException e) { + throw new RuntimeException("Fail to parse separator. ", e); + } + this.fileFormat = fileFormat.orElse(null); + } + + public Optional<Separator> getLineDelimiter() { + if (lineDelimiter == null || lineDelimiter.getOriSeparator() == null) { + return Optional.empty(); + } + return Optional.of(lineDelimiter); + } + + public Optional<Separator> getColumnSeparator() { + if (columnSeparator == null || columnSeparator.getOriSeparator() == null) { + return Optional.empty(); + } + return Optional.of(columnSeparator); + } + + public Optional<String> getFileFormat() { + return Optional.ofNullable(fileFormat); + } + } + + public List<String> getNameParts() { + return nameParts; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public List<String> getPartitionNames() { + return partitionNames; + } + + public FileFormatDesc getFormatDesc() { + return formatDesc; + } + + public List<String> getFilePaths() { + return filePaths; + } + + public List<String> getColumnsFromPath() { + return columnsFromPath; + } + + public Map<String, Expression> getColumnMappings() { + return columnMappings; + } + + public Optional<Expression> getPrecedingFilterExpr() { + return precedingFilterExpr; + } + + public Optional<Expression> getWhereExpr() { + return whereExpr; + } + + public List<String> getFileFieldNames() { + return fileFieldNames; + } + + public Optional<String> getSequenceCol() { + if (sequenceCol.isPresent() && StringUtils.isBlank(sequenceCol.get())) { + return Optional.empty(); + } + return sequenceCol; + } + + public Optional<Expression> getDeleteCondition() { + return deleteCondition; + } + + public LoadTask.MergeType getMergeType() { + return mergeType; + } + + public Map<String, String> getProperties() { + return dataProperties; + } + + // Change all the columns name to lower case, because Doris column is case-insensitive. + private List<String> columnsNameToLowerCase(List<String> columns) { + if (columns == null || columns.isEmpty() || "json".equals(this.formatDesc.fileFormat)) { + return columns; + } + List<String> lowerCaseColumns = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + String column = columns.get(i); + lowerCaseColumns.add(i, column.toLowerCase()); + } + return lowerCaseColumns; + } + + @Override + public String toString() { + return toSql(); + } + + /** + * print data desc load info + * @return bulk load sql + */ + public String toSql() { + StringBuilder sb = new StringBuilder(); + if (isMysqlLoad) { + sb.append("DATA ").append(isClientLocal() ? "LOCAL " : ""); + sb.append("INFILE '").append(filePaths.get(0)).append("'"); + } else if (isLoadFromTable()) { + sb.append(mergeType.toString()); + sb.append(" DATA FROM TABLE ").append(srcTableName); + } else { + sb.append(mergeType.toString()); + sb.append(" DATA INFILE ("); + Joiner.on(", ").appendTo(sb, filePaths.stream() + .map(s -> "'" + s + "'").collect(Collectors.toList())).append(")"); + } + if (isNegative) { + sb.append(" NEGATIVE"); + } + sb.append(" INTO TABLE "); + sb.append(isMysqlLoad ? ClusterNamespace.getNameFromFullName(dbName) + "." + tableName : tableName); + if (partitionNames != null && !partitionNames.isEmpty()) { + sb.append(" ("); + Joiner.on(", ").appendTo(sb, partitionNames).append(")"); + } + if (formatDesc.columnSeparator != null) { + sb.append(" COLUMNS TERMINATED BY ").append(formatDesc.columnSeparator.toSql()); + } + if (formatDesc.lineDelimiter != null && isMysqlLoad) { + sb.append(" LINES TERMINATED BY ").append(formatDesc.lineDelimiter.toSql()); + } + if (formatDesc.fileFormat != null && !formatDesc.fileFormat.isEmpty()) { + sb.append(" FORMAT AS '" + formatDesc.fileFormat + "'"); + } + if (fileFieldNames != null && !fileFieldNames.isEmpty()) { + sb.append(" ("); + Joiner.on(", ").appendTo(sb, fileFieldNames).append(")"); + } + if (columnsFromPath != null && !columnsFromPath.isEmpty()) { + sb.append(" COLUMNS FROM PATH AS ("); + Joiner.on(", ").appendTo(sb, columnsFromPath).append(")"); + } + if (columnMappings != null && !columnMappings.isEmpty()) { + sb.append(" SET ("); + Joiner.on(", ").appendTo(sb, columnMappings.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue().toSql()).collect(Collectors.toList())).append(")"); + } + whereExpr.ifPresent(e -> sb.append(" WHERE ").append(e.toSql())); + deleteCondition.ifPresent(e -> { + if (mergeType == LoadTask.MergeType.MERGE) { + sb.append(" DELETE ON ").append(e.toSql()); + } + }); + return sb.toString(); + } + + private boolean isLoadFromTable() { + return false; + } + + private boolean isClientLocal() { + return false; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java new file mode 100644 index 0000000000..38543c069d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/BulkStorageDesc.java @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.datasource.property.S3ClientBEProperties; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +/** + * Broker descriptor + * Broker example: + * WITH S3/HDFS + * ( + * "username" = "user0", + * "password" = "password0" + * ) + */ +public class BulkStorageDesc implements Writable { + @SerializedName(value = "storageType") + protected StorageType storageType; + @SerializedName(value = "properties") + protected Map<String, String> properties; + @SerializedName(value = "name") + private String name; + + /** + * Bulk Storage Type + */ + public enum StorageType { + BROKER, + S3, + HDFS, + LOCAL; + + } + + /** + * BulkStorageDesc + * @param name bulk load name + * @param properties properties + */ + public BulkStorageDesc(String name, Map<String, String> properties) { + this(name, StorageType.BROKER, properties); + } + + /** + * BulkStorageDesc + * @param name bulk load name + * @param type bulk load type + * @param properties properties + */ + public BulkStorageDesc(String name, StorageType type, Map<String, String> properties) { + this.name = name; + this.properties = properties; + if (this.properties == null) { + this.properties = Maps.newHashMap(); + } + this.storageType = type; + this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties)); + } + + public StorageType getStorageType() { + return storageType; + } + + public Map<String, String> getProperties() { + return properties; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static BulkStorageDesc read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, BulkStorageDesc.class); + } + + /** + * bulk load to sql string + * @return bulk load sql + */ + public String toSql() { + StringBuilder sb = new StringBuilder(); + if (storageType == StorageType.BROKER) { + sb.append("WITH BROKER ").append(name); + } else { + sb.append("WITH ").append(storageType.name()); + } + if (properties != null && !properties.isEmpty()) { + PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false, true); + sb.append(" (").append(printableMap).append(")"); + } + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index e91e846e2f..c055da5735 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.plans.commands.DeleteCommand; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; import org.apache.doris.nereids.trees.plans.commands.ExportCommand; import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.LoadCommand; import org.apache.doris.nereids.trees.plans.commands.UpdateCommand; /** CommandVisitor. */ @@ -52,6 +53,10 @@ public interface CommandVisitor<R, C> { return visitCommand(deleteCommand, context); } + default R visitLoadCommand(LoadCommand loadCommand, C context) { + return visitCommand(loadCommand, context); + } + default R visitExportCommand(ExportCommand exportCommand, C context) { return visitCommand(exportCommand, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 4eeadffd09..6acc276a25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -103,7 +103,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio protected static final String FUZZY_PARSE = "fuzzy_parse"; protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes"; protected static final String SKIP_LINES = "skip_lines"; - protected static final String CSV_SCHEMA = "csv_schema"; + public static final String CSV_SCHEMA = "csv_schema"; protected static final String COMPRESS_TYPE = "compress_type"; public static final String PATH_PARTITION_KEYS = "path_partition_keys"; // decimal(p,s) @@ -380,12 +380,16 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio @Override public List<Column> getTableColumns() throws AnalysisException { - if (FeConstants.runningUnitTest) { - return Lists.newArrayList(); - } if (!csvSchema.isEmpty()) { return csvSchema; } + if (FeConstants.runningUnitTest) { + Object mockedUtObj = FeConstants.unitTestConstant; + if (mockedUtObj instanceof List) { + return ((List<Column>) mockedUtObj); + } + return new ArrayList<>(); + } if (this.columns != null) { return columns; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java new file mode 100644 index 0000000000..e5a3e66932 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/BulkLoadDataDescTest.java @@ -0,0 +1,658 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.analyzer.UnboundAlias; +import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; +import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.analyzer.UnboundTVFRelation; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.stats.ExpressionEstimation; +import org.apache.doris.nereids.trees.expressions.Add; +import org.apache.doris.nereids.trees.expressions.BinaryArithmetic; +import org.apache.doris.nereids.trees.expressions.CaseWhen; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThan; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.WhenClause; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; +import org.apache.doris.nereids.trees.expressions.literal.StringLiteral; +import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; +import org.apache.doris.nereids.trees.plans.commands.LoadCommand; +import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; +import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.Statistics; +import org.apache.doris.utframe.TestWithFeService; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class BulkLoadDataDescTest extends TestWithFeService { + + private List<String> sinkCols1 = new ArrayList<>(); + private List<String> sinkCols2 = new ArrayList<>(); + + @Override + protected void runBeforeAll() throws Exception { + connectContext.getState().setNereids(true); + connectContext.getSessionVariable().enableFallbackToOriginalPlanner = false; + connectContext.getSessionVariable().enableNereidsTimeout = false; + connectContext.getSessionVariable().enableNereidsDML = true; + FeConstants.runningUnitTest = true; + + createDatabase("nereids_load"); + useDatabase("nereids_load"); + String createTableSql = "CREATE TABLE `customer` (\n" + + " `custkey` int(11) NOT NULL,\n" + + " `c_name` varchar(25) NOT NULL,\n" + + " `c_address` varchar(40) NOT NULL,\n" + + " `c_nationkey` int(11) NOT NULL,\n" + + " `c_phone` varchar(15) NOT NULL,\n" + + " `c_acctbal` DECIMAL(15, 2) NOT NULL,\n" + + " `c_mktsegment` varchar(10) NOT NULL,\n" + + " `c_comment` varchar(117) NOT NULL\n" + + ") ENGINE=OLAP\n" + + "UNIQUE KEY(`custkey`)\n" + + "COMMENT 'OLAP'\n" + + "DISTRIBUTED BY HASH(`custkey`) BUCKETS 24\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"function_column.sequence_col\" = \"c_nationkey\"," + + "\"storage_format\" = \"V2\"\n" + + ");"; + createTable(createTableSql); + sinkCols1.add("custkey"); + sinkCols1.add("c_name"); + sinkCols1.add("c_address"); + sinkCols1.add("c_nationkey"); + sinkCols1.add("c_phone"); + sinkCols1.add("c_acctbal"); + sinkCols1.add("c_mktsegment"); + sinkCols1.add("c_comment"); + + String createTableSql2 = "CREATE TABLE `customer_dup` (\n" + + " `custkey` int(11) NOT NULL,\n" + + " `c_name` varchar(25) NOT NULL,\n" + + " `address` varchar(40) NOT NULL,\n" + + " `c_nationkey` int(11) NOT NULL,\n" + + " `c_phone` varchar(15) NOT NULL,\n" + + " `c_acctbal` DECIMAL(15, 2) NOT NULL,\n" + + " `c_mktsegment` varchar(10) NOT NULL,\n" + + " `c_comment` varchar(117) NOT NULL\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(`custkey`,`c_name`)\n" + + "COMMENT 'OLAP'\n" + + "DISTRIBUTED BY HASH(`custkey`) BUCKETS 24\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"storage_format\" = \"V2\"\n" + + ");"; + createTable(createTableSql2); + sinkCols2.add("custkey"); + sinkCols2.add("c_name"); + sinkCols2.add("address"); + sinkCols2.add("c_nationkey"); + sinkCols2.add("c_phone"); + sinkCols2.add("c_acctbal"); + sinkCols2.add("c_mktsegment"); + sinkCols2.add("c_comment"); + + } + + @Test + public void testParseLoadStmt() throws Exception { + String loadSql1 = "LOAD LABEL customer_j23( " + + " DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer" + + " COLUMNS TERMINATED BY \"|\"" + + " LINES TERMINATED BY \"\n\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) " + + " SET ( custkey=case when c_custkey=-8 then -3 when c_custkey=-1 then 11 else c_custkey end ) " + + " PRECEDING FILTER c_nationkey=\"CHINA\" " + + " WHERE custkey > 100" + + " ORDER BY c_custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + + List<Pair<LogicalPlan, StatementContext>> statements = new NereidsParser().parseMultiple(loadSql1); + Assertions.assertFalse(statements.isEmpty()); + + List<String> expectedSinkColumns = new ArrayList<>(sinkCols1); + expectedSinkColumns.add(Column.SEQUENCE_COL); + + CaseWhen caseWhen = new CaseWhen(new ArrayList<WhenClause>() { + { + add(new WhenClause( + new EqualTo( + new UnboundSlot("c_custkey"), + new TinyIntLiteral((byte) -8)), + new TinyIntLiteral((byte) -3))); + add(new WhenClause( + new EqualTo( + new UnboundSlot("c_custkey"), + new TinyIntLiteral((byte) -1)), + new TinyIntLiteral((byte) 11))); + } + }, new UnboundSlot("c_custkey")); + + List<NamedExpression> expectedProjects = new ArrayList<NamedExpression>() { + { + add(new UnboundAlias(caseWhen, "custkey")); + add(new UnboundAlias(new UnboundSlot("c_address"), "c_address")); + add(new UnboundAlias(new UnboundSlot("c_nationkey"), "c_nationkey")); + add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone")); + add(new UnboundAlias(new UnboundSlot("c_acctbal"), "c_acctbal")); + add(new UnboundAlias(new UnboundSlot("c_mktsegment"), "c_mktsegment")); + add(new UnboundAlias(new UnboundSlot("c_comment"), "c_comment")); + } + }; + List<Expression> expectedConjuncts = new ArrayList<Expression>() { + { + add(new GreaterThan(caseWhen, new IntegerLiteral(100))); + } + }; + assertInsertIntoPlan(statements, expectedSinkColumns, expectedProjects, expectedConjuncts, true); + } + + private void assertInsertIntoPlan(List<Pair<LogicalPlan, StatementContext>> statements, + List<String> expectedSinkColumns, + List<NamedExpression> expectedProjects, + List<Expression> expectedConjuncts, + boolean expectedPreFilter) throws AnalysisException { + Assertions.assertTrue(statements.get(0).first instanceof LoadCommand); + List<LogicalPlan> plans = ((LoadCommand) statements.get(0).first).parseToInsertIntoPlan(connectContext); + Assertions.assertTrue(plans.get(0) instanceof UnboundOlapTableSink); + List<String> colNames = ((UnboundOlapTableSink<?>) plans.get(0)).getColNames(); + Assertions.assertEquals(colNames.size(), expectedSinkColumns.size()); + for (String sinkCol : expectedSinkColumns) { + Assertions.assertTrue(colNames.contains(sinkCol)); + } + Assertions.assertTrue(plans.get(0).child(0) instanceof LogicalProject); + LogicalProject<?> project = ((LogicalProject<?>) plans.get(0).child(0)); + Set<String> projects = project.getProjects().stream() + .map(Object::toString) + .collect(Collectors.toSet()); + for (NamedExpression namedExpression : expectedProjects) { + Assertions.assertTrue(projects.contains(namedExpression.toString())); + } + Assertions.assertTrue(project.child(0) instanceof LogicalFilter); + LogicalFilter<?> filter = ((LogicalFilter<?>) project.child(0)); + Set<String> filterConjuncts = filter.getConjuncts().stream() + .map(Object::toString) + .collect(Collectors.toSet()); + for (Expression expectedConjunct : expectedConjuncts) { + Assertions.assertTrue(filterConjuncts.contains(expectedConjunct.toString())); + } + + Assertions.assertTrue(filter.child(0) instanceof LogicalProject); + LogicalProject<?> tvfProject = (LogicalProject<?>) filter.child(0); + if (expectedPreFilter) { + Assertions.assertTrue(tvfProject.child(0) instanceof LogicalFilter); + LogicalFilter<?> tvfFilter = (LogicalFilter<?>) tvfProject.child(0); + Assertions.assertTrue(tvfFilter.child(0) instanceof LogicalCheckPolicy); + Assertions.assertTrue(tvfFilter.child(0).child(0) instanceof UnboundTVFRelation); + } else { + Assertions.assertTrue(tvfProject.child(0) instanceof LogicalCheckPolicy); + Assertions.assertTrue(tvfProject.child(0).child(0) instanceof UnboundTVFRelation); + } + } + + @Test + public void testParseLoadStmtPartitions() throws Exception { + String loadSql1 = "LOAD LABEL customer_j23( " + + " DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer" + + " PARTITION (c_name, dt) " + + " COLUMNS TERMINATED BY \"|\"" + + " LINES TERMINATED BY \"\n\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, dt) " + + " COLUMNS FROM PATH AS (dt)" + + " ORDER BY c_custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + List<Pair<LogicalPlan, StatementContext>> statements = new NereidsParser().parseMultiple(loadSql1); + Assertions.assertFalse(statements.isEmpty()); + + List<String> expectedSinkColumns = new ArrayList<>(sinkCols1); + expectedSinkColumns.add(Column.SEQUENCE_COL); + expectedSinkColumns.add("dt"); + List<NamedExpression> expectedProjects = new ArrayList<NamedExpression>() { + { + add(new UnboundAlias(new UnboundSlot("c_custkey"), "custkey")); + add(new UnboundAlias(new UnboundSlot("c_name"), "c_name")); + add(new UnboundAlias(new UnboundSlot("c_address"), "c_address")); + add(new UnboundAlias(new UnboundSlot("c_nationkey"), "c_nationkey")); + add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone")); + add(new UnboundAlias(new UnboundSlot("c_acctbal"), "c_acctbal")); + add(new UnboundAlias(new UnboundSlot("c_mktsegment"), "c_mktsegment")); + add(new UnboundAlias(new UnboundSlot("c_comment"), "c_comment")); + add(new UnboundSlot("dt")); + } + }; + List<Expression> expectedConjuncts = new ArrayList<>(); + assertInsertIntoPlan(statements, expectedSinkColumns, expectedProjects, expectedConjuncts, false); + } + + @Test + public void testParseLoadStmtColumFromPath() throws Exception { + String loadSql1 = "LOAD LABEL customer_j23( " + + " DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer" + + " PARTITION (c_name, dt) " + + " COLUMNS TERMINATED BY \"|\"" + + " LINES TERMINATED BY \"\n\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, dt) " + + " COLUMNS FROM PATH AS (pt) " + + " SET ( custkey=c_custkey+1 ) " + + " PRECEDING FILTER c_nationkey=\"CHINA\" " + + " WHERE custkey > 100" + + " ORDER BY c_custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + List<Pair<LogicalPlan, StatementContext>> statements = new NereidsParser().parseMultiple(loadSql1); + Assertions.assertFalse(statements.isEmpty()); + + List<String> expectedSinkColumns = new ArrayList<>(sinkCols1); + expectedSinkColumns.add(Column.SEQUENCE_COL); + expectedSinkColumns.add("pt"); + List<NamedExpression> expectedProjects = new ArrayList<NamedExpression>() { + { + add(new UnboundAlias(new Add( + new UnboundSlot("c_custkey"), new TinyIntLiteral((byte) 1)), "custkey")); + add(new UnboundAlias(new UnboundSlot("c_name"), "c_name")); + add(new UnboundAlias(new UnboundSlot("c_address"), "c_address")); + add(new UnboundAlias(new UnboundSlot("c_nationkey"), "c_nationkey")); + add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone")); + add(new UnboundAlias(new UnboundSlot("c_acctbal"), "c_acctbal")); + add(new UnboundAlias(new UnboundSlot("c_mktsegment"), "c_mktsegment")); + add(new UnboundAlias(new UnboundSlot("c_comment"), "c_comment")); + add(new UnboundSlot("pt")); + } + }; + List<Expression> expectedConjuncts = new ArrayList<Expression>() { + { + add(new GreaterThan(new Add(new UnboundSlot("c_custkey"), new TinyIntLiteral((byte) 1)), + new IntegerLiteral(100))); + } + }; + assertInsertIntoPlan(statements, expectedSinkColumns, expectedProjects, expectedConjuncts, true); + } + + @Test + public void testParseLoadStmtNoColumn() throws Exception { + String loadSql1 = "LOAD LABEL customer_no_col( " + + " DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer" + + " FORMAT AS CSV" + + " ORDER BY custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + + List<Pair<LogicalPlan, StatementContext>> statements = new NereidsParser().parseMultiple(loadSql1); + Assertions.assertFalse(statements.isEmpty()); + List<String> expectedSinkColumns = new ArrayList<>(sinkCols1); + expectedSinkColumns.add(Column.SEQUENCE_COL); + List<NamedExpression> expectedProjects = new ArrayList<NamedExpression>() { + { + // when no specified columns, tvf columns equals to olap columns + add(new UnboundAlias(new UnboundSlot("custkey"), "custkey")); + add(new UnboundAlias(new UnboundSlot("c_name"), "c_name")); + add(new UnboundAlias(new UnboundSlot("c_address"), "c_address")); + add(new UnboundAlias(new UnboundSlot("c_nationkey"), "c_nationkey")); + add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone")); + add(new UnboundAlias(new UnboundSlot("c_acctbal"), "c_acctbal")); + add(new UnboundAlias(new UnboundSlot("c_mktsegment"), "c_mktsegment")); + add(new UnboundAlias(new UnboundSlot("c_comment"), "c_comment")); + } + }; + List<Expression> expectedConjuncts = new ArrayList<>(); + assertInsertIntoPlan(statements, expectedSinkColumns, expectedProjects, expectedConjuncts, false); + + // k1:int;k2:bigint;k3:varchar(20);k4:datetime(6) + String loadSql2 = "LOAD LABEL customer_no_col2( " + + " DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer" + + " FORMAT AS CSV" + + " ORDER BY custkey " + + " PROPERTIES( " + + " \"csv_schema\" = \"" + + " custkey:INT;" + + " c_name:STRING;" + + " c_address:STRING;" + + " c_nationkey:INT;" + + " c_phone:STRING;" + + " c_acctbal:DECIMAL(15, 2);" + + " c_mktsegment:STRING;" + + " c_comment:STRING;\"" + + " ) " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + + List<Pair<LogicalPlan, StatementContext>> statements2 = new NereidsParser().parseMultiple(loadSql2); + Assertions.assertFalse(statements2.isEmpty()); + List<String> expectedSinkColumns2 = new ArrayList<>(sinkCols1); + expectedSinkColumns2.add(Column.SEQUENCE_COL); + List<NamedExpression> expectedProjects2 = new ArrayList<NamedExpression>() { + { + add(new UnboundAlias(new UnboundSlot("custkey"), "custkey")); + add(new UnboundAlias(new UnboundSlot("c_name"), "c_name")); + add(new UnboundAlias(new UnboundSlot("c_address"), "c_address")); + add(new UnboundAlias(new UnboundSlot("c_nationkey"), "c_nationkey")); + add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone")); + add(new UnboundAlias(new UnboundSlot("c_acctbal"), "c_acctbal")); + add(new UnboundAlias(new UnboundSlot("c_mktsegment"), "c_mktsegment")); + add(new UnboundAlias(new UnboundSlot("c_comment"), "c_comment")); + } + }; + List<Expression> expectedConjuncts2 = new ArrayList<>(); + assertInsertIntoPlan(statements2, expectedSinkColumns2, expectedProjects2, expectedConjuncts2, false); + } + + @Test + public void testParseLoadStmtWithParquetMappingFilter() throws Exception { + String loadSql1 = "LOAD LABEL customer_dup_mapping( " + + " DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer_dup" + + " FORMAT AS PARQUET" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) " + + " SET ( custkey=c_custkey+1, address=c_address+'_base') " + + " PRECEDING FILTER c_nationkey=\"CHINA\" " + + " WHERE custkey = 100" + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + List<Pair<LogicalPlan, StatementContext>> statements = new NereidsParser().parseMultiple(loadSql1); + Assertions.assertFalse(statements.isEmpty()); + List<String> expectedSinkColumns = new ArrayList<>(sinkCols2); + List<NamedExpression> expectedProjects = new ArrayList<NamedExpression>() { + { + add(new UnboundAlias(new Add( + new UnboundSlot("c_custkey"), new TinyIntLiteral((byte) 1)), "custkey")); + add(new UnboundAlias(new UnboundSlot("c_name"), "c_name")); + add(new UnboundAlias(new Add( + new UnboundSlot("c_address"), new StringLiteral("_base")), "address")); + add(new UnboundAlias(new UnboundSlot("c_nationkey"), "c_nationkey")); + add(new UnboundAlias(new UnboundSlot("c_phone"), "c_phone")); + add(new UnboundAlias(new UnboundSlot("c_acctbal"), "c_acctbal")); + add(new UnboundAlias(new UnboundSlot("c_mktsegment"), "c_mktsegment")); + add(new UnboundAlias(new UnboundSlot("c_comment"), "c_comment")); + } + }; + List<Expression> expectedConjuncts = new ArrayList<Expression>() { + { + add(new EqualTo(new Add( + new UnboundSlot("c_custkey"), new TinyIntLiteral((byte) 1)), new IntegerLiteral(100))); + } + }; + assertInsertIntoPlan(statements, expectedSinkColumns, expectedProjects, expectedConjuncts, true); + } + + @Test + public void testParseLoadStmtWithDeleteOn() throws Exception { + String loadSqlWithDeleteOnErr1 = "LOAD LABEL customer_label1( " + + " APPEND DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer" + + " COLUMNS TERMINATED BY \"|\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) " + + " SET ( custkey=c_custkey+1 ) " + + " PRECEDING FILTER c_nationkey=\"CHINA\" " + + " WHERE custkey > 100" + + " DELETE ON c_custkey < 120 " + + " ORDER BY custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + try { + List<Pair<LogicalPlan, StatementContext>> statements = + new NereidsParser().parseMultiple(loadSqlWithDeleteOnErr1); + Assertions.assertFalse(statements.isEmpty()); + Assertions.assertTrue(statements.get(0).first instanceof LoadCommand); + ((LoadCommand) statements.get(0).first).parseToInsertIntoPlan(connectContext); + } catch (AnalysisException e) { + Assertions.assertTrue(e.getMessage().contains(BulkLoadDataDesc.EXPECT_MERGE_DELETE_ON)); + } + + String loadSqlWithDeleteOnErr2 = "LOAD LABEL customer_label1( " + + " MERGE DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer" + + " COLUMNS TERMINATED BY \"|\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) " + + " SET ( custkey=c_custkey+1 ) " + + " PRECEDING FILTER c_nationkey=\"CHINA\" " + + " WHERE custkey > 100" + + " ORDER BY custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + try { + List<Pair<LogicalPlan, StatementContext>> statements = + new NereidsParser().parseMultiple(loadSqlWithDeleteOnErr2); + Assertions.assertFalse(statements.isEmpty()); + Assertions.assertTrue(statements.get(0).first instanceof LoadCommand); + ((LoadCommand) statements.get(0).first).parseToInsertIntoPlan(connectContext); + } catch (AnalysisException e) { + Assertions.assertTrue(e.getMessage().contains(BulkLoadDataDesc.EXPECT_DELETE_ON)); + } + + String loadSqlWithDeleteOnOk = "LOAD LABEL customer_label2( " + + " MERGE DATA INFILE(\"s3://bucket/customer\") " + + " INTO TABLE customer" + + " COLUMNS TERMINATED BY \"|\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) " + + " SET ( custkey=c_custkey+1 ) " + + " PRECEDING FILTER c_nationkey=\"CHINA\" " + + " WHERE custkey > 100" + + " DELETE ON custkey < 120 " + + " ORDER BY custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + + FeConstants.unitTestConstant = new ArrayList<Column>() { + { + add(new Column("c_custkey", PrimitiveType.INT, true)); + add(new Column("c_name", PrimitiveType.VARCHAR, true)); + add(new Column("c_address", PrimitiveType.VARCHAR, true)); + add(new Column("c_nationkey", PrimitiveType.INT, true)); + add(new Column("c_phone", PrimitiveType.VARCHAR, true)); + add(new Column("c_acctbal", PrimitiveType.DECIMALV2, true)); + add(new Column("c_mktsegment", PrimitiveType.VARCHAR, true)); + add(new Column("c_comment", PrimitiveType.VARCHAR, true)); + } + }; + new MockUp<ExpressionEstimation>(ExpressionEstimation.class) { + @Mock + public ColumnStatistic visitCast(Cast cast, Statistics context) { + return ColumnStatistic.UNKNOWN; + } + + @Mock + public ColumnStatistic visitBinaryArithmetic(BinaryArithmetic binaryArithmetic, Statistics context) { + return ColumnStatistic.UNKNOWN; + } + }; + + List<Pair<LogicalPlan, StatementContext>> statements = new NereidsParser().parseMultiple(loadSqlWithDeleteOnOk); + Assertions.assertFalse(statements.isEmpty()); + + List<String> expectedSinkColumns = new ArrayList<>(sinkCols1); + expectedSinkColumns.add(Column.SEQUENCE_COL); + expectedSinkColumns.add(Column.DELETE_SIGN); + List<NamedExpression> expectedProjects = new ArrayList<>(); + List<Expression> expectedConjuncts = new ArrayList<>(); + assertInsertIntoPlan(statements, expectedSinkColumns, expectedProjects, expectedConjuncts, true); + // new StmtExecutor(connectContext, loadSqlWithDeleteOnOk).execute(); + } + + @Test + public void testParseLoadStmtPatternPath() throws Exception { + String path1 = "part*"; + String path2 = "*/part_000"; + String path3 = "*part_000*"; + String path4 = "*/*part_000*"; + String loadTemplate = "LOAD LABEL customer_j23( " + + " DATA INFILE(\"s3://bucket/customer/PATTERN\") " + + " INTO TABLE customer" + + " PARTITION (c_name, dt) " + + " COLUMNS TERMINATED BY \"|\"" + + " LINES TERMINATED BY \"\n\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, dt) " + + " ORDER BY c_custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + Assertions.assertFalse(new NereidsParser() + .parseMultiple(loadTemplate.replace("PATTERN", path1)).isEmpty()); + Assertions.assertFalse(new NereidsParser() + .parseMultiple(loadTemplate.replace("PATTERN", path2)).isEmpty()); + Assertions.assertFalse(new NereidsParser() + .parseMultiple(loadTemplate.replace("PATTERN", path3)).isEmpty()); + Assertions.assertFalse(new NereidsParser() + .parseMultiple(loadTemplate.replace("PATTERN", path4)).isEmpty()); + } + + @Test + public void testParseLoadStmtMultiLocations() throws Exception { + String loadMultiLocations = "LOAD LABEL customer_j23( " + + " DATA INFILE(" + + " \"s3://bucket/customer/path1\", " + + " \"s3://bucket/customer/path2\", " + + " \"s3://bucket/customer/path3\") " + + " INTO TABLE customer" + + " PARTITION (c_name, dt) " + + " COLUMNS TERMINATED BY \"|\"" + + " LINES TERMINATED BY \"\n\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, dt) " + + " ORDER BY c_custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + Assertions.assertFalse(new NereidsParser() + .parseMultiple(loadMultiLocations).isEmpty()); + } + + @Test + public void testParseLoadStmtMultiBulkDesc() throws Exception { + String loadMultiLocations = "LOAD LABEL customer_j23( " + + " DATA INFILE(" + + " \"s3://bucket/customer/path1\", " + + " \"s3://bucket/customer/path2\", " + + " \"s3://bucket/customer/path3\") " + + " INTO TABLE customer" + + " PARTITION (c_name) " + + " COLUMNS TERMINATED BY \"|\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) " + + " ORDER BY c_custkey " + + " ," + + " DATA INFILE(\"s3://bucket/customer/par_a*\") " + + " INTO TABLE customer_dup" + + " FORMAT AS PARQUET" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment) " + + " SET ( custkey=c_custkey+1, address=c_address+'_base') " + + " WHERE custkey < 50" + + " ," + + " DATA INFILE(" + + " \"s3://bucket/customer/p\") " + + " INTO TABLE customer" + + " PARTITION (c_name, dt) " + + " COLUMNS TERMINATED BY \"|\"" + + " LINES TERMINATED BY \"\n\"" + + " (c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, dt)" + + " SET ( custkey=c_custkey+1 ) " + + " PRECEDING FILTER c_nationkey=\"CHINA\" " + + " WHERE custkey > 100" + + " ORDER BY c_custkey " + + " ) " + + " WITH S3( " + + " \"s3.access_key\" = \"AK\", " + + " \"s3.secret_key\" = \"SK\", " + + " \"s3.endpoint\" = \"cos.ap-beijing.myqcloud.com\", " + + " \"s3.region\" = \"ap-beijing\") " + + "PROPERTIES( \"exec_mem_limit\" = \"8589934592\") COMMENT \"test\";"; + Assertions.assertFalse(new NereidsParser() + .parseMultiple(loadMultiLocations).isEmpty()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java index 64a0dc5cad..a6eb67b061 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java @@ -32,15 +32,16 @@ import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.datasource.property.constants.S3Properties.Env; import org.apache.doris.load.loadv2.LoadTask.MergeType; import org.apache.doris.tablefunction.S3TableValuedFunction; +import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Injectable; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import java.io.StringReader; import java.util.Collections; @@ -48,7 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -public class S3TvfLoadStmtTest { +public class S3TvfLoadStmtTest extends TestWithFeService { private static final String ACCESS_KEY_VALUE = "ak"; @@ -70,7 +71,7 @@ public class S3TvfLoadStmtTest { private Set<String> colNames; - @Before + @BeforeAll public void setUp() throws AnalysisException { FeConstants.runningUnitTest = true; @@ -101,7 +102,7 @@ public class S3TvfLoadStmtTest { Maps.newHashMap(), "comment"); final SelectStmt selectStmt = (SelectStmt) s3TvfLoadStmt.getQueryStmt(); final Expr whereClause = Deencapsulation.getField(selectStmt, "whereClause"); - Assert.assertEquals(whereClause, new CompoundPredicate(CompoundPredicate.Operator.AND, greater, less)); + Assertions.assertEquals(whereClause, new CompoundPredicate(CompoundPredicate.Operator.AND, greater, less)); } @Test @@ -115,15 +116,15 @@ public class S3TvfLoadStmtTest { final TableRef tvfRef = Deencapsulation.invoke(S3TvfLoadStmt.class, "buildTvfRef", dataDescription, brokerDesc); - Assert.assertTrue(tvfRef instanceof TableValuedFunctionRef); + Assertions.assertTrue(tvfRef instanceof TableValuedFunctionRef); final S3TableValuedFunction tableFunction = (S3TableValuedFunction) ((TableValuedFunctionRef) tvfRef).getTableFunction(); final Map<String, String> locationProperties = tableFunction.getLocationProperties(); - Assert.assertEquals(locationProperties.get(S3Properties.ENDPOINT), ENDPOINT_VALUE); - Assert.assertEquals(locationProperties.get(S3Properties.ACCESS_KEY), ACCESS_KEY_VALUE); - Assert.assertEquals(locationProperties.get(S3Properties.SECRET_KEY), SECRET_KEY_VALUE); - Assert.assertEquals(locationProperties.get(S3Properties.REGION), REGION_VALUE); - Assert.assertEquals(tableFunction.getFilePath(), DATA_URI); + Assertions.assertEquals(locationProperties.get(S3Properties.ENDPOINT), ENDPOINT_VALUE); + Assertions.assertEquals(locationProperties.get(S3Properties.ACCESS_KEY), ACCESS_KEY_VALUE); + Assertions.assertEquals(locationProperties.get(S3Properties.SECRET_KEY), SECRET_KEY_VALUE); + Assertions.assertEquals(locationProperties.get(S3Properties.REGION), REGION_VALUE); + Assertions.assertEquals(tableFunction.getFilePath(), DATA_URI); } @Injectable @@ -176,13 +177,13 @@ public class S3TvfLoadStmtTest { Deencapsulation.setField(s3TvfLoadStmt, "functionGenTableColNames", Sets.newHashSet("c1", "c2", "c3")); Deencapsulation.invoke(s3TvfLoadStmt, "rewriteExpr", columnsDescList); - Assert.assertEquals(columnsDescList.size(), 5); + Assertions.assertEquals(columnsDescList.size(), 5); final String orig4 = "((upper(`c1`) + 1) + 1)"; - Assert.assertEquals(orig4, columnsDescList.get(4).getExpr().toString()); + Assertions.assertEquals(orig4, columnsDescList.get(4).getExpr().toString()); final List<ImportColumnDesc> filterColumns = Deencapsulation.invoke(s3TvfLoadStmt, "filterColumns", columnsDescList); - Assert.assertEquals(filterColumns.size(), 4); + Assertions.assertEquals(filterColumns.size(), 4); } private static DataDescription buildDataDesc(Iterable<String> columns, Expr fileFilter, Expr wherePredicate, @@ -210,11 +211,12 @@ public class S3TvfLoadStmtTest { private static List<ImportColumnDesc> getColumnsDescList(String columns) throws Exception { String columnsSQL = "COLUMNS (" + columns + ")"; return ((ImportColumnsStmt) SqlParserUtils.getFirstStmt( - new SqlParser(new SqlScanner(new StringReader(columnsSQL))))).getColumns(); + new org.apache.doris.analysis.SqlParser( + new org.apache.doris.analysis.SqlScanner(new StringReader(columnsSQL))))).getColumns(); } private static List<Column> getBaseSchema() { - List<Column> columns = com.google.common.collect.Lists.newArrayList(); + List<Column> columns = Lists.newArrayList(); Column c1 = new Column("c1", PrimitiveType.BIGINT); c1.setIsKey(true); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org