This is an automated email from the ASF dual-hosted git repository.
starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bd90fc6f70b [feat](Nereids) support copy into command (#47194)
bd90fc6f70b is described below
commit bd90fc6f70b9a24911642cdc55da842fd59b2a84
Author: LiBinfeng <[email protected]>
AuthorDate: Thu Apr 24 18:25:07 2025 +0800
[feat](Nereids) support copy into command (#47194)
---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 8 +-
.../java/org/apache/doris/analysis/CastExpr.java | 3 +
.../org/apache/doris/analysis/CopyFromParam.java | 10 +
.../apache/doris/analysis/CopyIntoProperties.java | 2 +-
.../org/apache/doris/analysis/CopyProperties.java | 7 +-
.../java/org/apache/doris/analysis/CopyStmt.java | 35 +-
.../org/apache/doris/analysis/DataDescription.java | 5 +-
.../doris/nereids/parser/LogicalPlanBuilder.java | 80 +++++
.../apache/doris/nereids/trees/plans/PlanType.java | 1 +
.../trees/plans/commands/CopyIntoCommand.java | 62 ++++
.../trees/plans/commands/info/CopyFromDesc.java} | 139 ++++----
.../trees/plans/commands/info/CopyIntoInfo.java | 358 +++++++++++++++++++++
.../trees/plans/visitor/CommandVisitor.java | 5 +
.../main/java/org/apache/doris/qe/DdlExecutor.java | 8 +-
.../suites/load_p0/copy_into/test_copy_into.groovy | 15 +-
15 files changed, 661 insertions(+), 77 deletions(-)
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 18f61acc89a..f2eada27362 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -162,6 +162,10 @@ supportedDmlStatement
(propertyClause)?
(withRemoteStorageSystem)? #export
| replayCommand #replay
+ | COPY INTO selectHint? name=multipartIdentifier columns=identifierList?
FROM
+ (stageAndPattern | (LEFT_PAREN SELECT selectColumnClause
+ FROM stageAndPattern whereClause? RIGHT_PAREN))
+ properties=propertyClause?
#copyInto
;
supportedCreateStatement
@@ -895,10 +899,6 @@ unsupportedUseStatement
unsupportedDmlStatement
: TRUNCATE TABLE multipartIdentifier specifiedPartition? FORCE?
#truncateTable
- | COPY INTO name=multipartIdentifier columns=identifierList? FROM
- (stageAndPattern | (LEFT_PAREN SELECT selectColumnClause
- FROM stageAndPattern whereClause? RIGHT_PAREN))
- properties=propertyClause?
#copyInto
;
stageAndPattern
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
index aab6a9dbec0..f83cae85b7c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
@@ -140,6 +140,9 @@ public class CastExpr extends Expr {
if (type.isStructType() && e.type.isStructType()) {
getChild(0).setType(type);
}
+ if (type.isScalarType()) {
+ targetTypeDef = new TypeDef(type);
+ }
analysisDone();
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java
index b2a57cd1f5d..2158220fde0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java
@@ -67,6 +67,16 @@ public class CopyFromParam {
this.fileFilterExpr = whereExpr;
}
+ public CopyFromParam(StageAndPattern stageAndPattern, List<Expr> exprList,
Expr fileFilterExpr,
+ List<String> fileColumns, List<Expr>
columnMappingList, List<String> targetColumns) {
+ this.stageAndPattern = stageAndPattern;
+ this.exprList = exprList;
+ this.fileFilterExpr = fileFilterExpr;
+ this.fileColumns = fileColumns;
+ this.columnMappingList = columnMappingList;
+ this.targetColumns = targetColumns;
+ }
+
public void analyze(String fullDbName, TableName tableName, boolean
useDeleteSign, String fileType)
throws AnalysisException {
if (exprList == null && fileFilterExpr == null && !useDeleteSign) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java
index 4291fda7581..792fc6be901 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyIntoProperties.java
@@ -84,7 +84,7 @@ public class CopyIntoProperties extends CopyProperties {
return result;
}
- protected void mergeProperties(StageProperties stageProperties) {
+ public void mergeProperties(StageProperties stageProperties) {
Map<String, String> properties =
stageProperties.getDefaultPropertiesWithoutPrefix();
for (Entry<String, String> entry : properties.entrySet()) {
if (!this.properties.containsKey(entry.getKey())) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java
index f6807b1bf79..d70976d3f3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyProperties.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.commons.lang3.StringUtils;
+import java.util.HashMap;
import java.util.Map;
public class CopyProperties {
@@ -57,7 +58,11 @@ public class CopyProperties {
public static final String USE_DELETE_SIGN = COPY_PREFIX +
"use_delete_sign";
public CopyProperties(Map<String, String> properties, String prefix) {
- this.properties = properties;
+ Map<String, String> newProperties = new HashMap<>();
+ for (String key : properties.keySet()) {
+ newProperties.put(key, properties.get(key));
+ }
+ this.properties = newProperties;
this.prefix = prefix;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
index 6bd4d3506d1..3a916f18640 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyStmt.java
@@ -36,6 +36,7 @@ import
org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.load.loadv2.LoadTask.MergeType;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.ShowResultSetMetaData;
@@ -80,8 +81,8 @@ public class CopyStmt extends DdlStmt implements
NotFallbackInParser {
private LabelName label = null;
private BrokerDesc brokerDesc = null;
private DataDescription dataDescription = null;
- private final Map<String, String> brokerProperties = new HashMap<>();
- private final Map<String, String> properties = new HashMap<>();
+ private Map<String, String> brokerProperties = new HashMap<>();
+ private Map<String, String> properties = new HashMap<>();
@Getter
private String stage;
@@ -110,6 +111,36 @@ public class CopyStmt extends DdlStmt implements
NotFallbackInParser {
}
}
+ /**
+ * Use for Nereids Planner.
+ */
+ public CopyStmt(TableName tableName, CopyFromParam copyFromParam,
+ CopyIntoProperties copyProperties, Map<String, Map<String,
String>> optHints, LabelName label,
+ String stageId, StageType stageType, String stagePrefix,
ObjectInfo objectInfo, String userName,
+ Map<String, String> brokerProperties, Map<String, String>
properties,
+ DataDescription dataDescription, BrokerDesc brokerDesc,
OriginStatement originStmt) {
+ this.tableName = tableName;
+ this.copyFromParam = copyFromParam;
+ this.stage = copyFromParam.getStageAndPattern().getStageName();
+ this.copyIntoProperties = copyProperties;
+ if (optHints != null) {
+ this.optHints = optHints.get(SET_VAR_KEY);
+ }
+
+ this.label = label;
+ this.brokerDesc = brokerDesc;
+ this.brokerProperties = brokerProperties;
+ this.properties = properties;
+
+ this.stageId = stageId;
+ this.stageType = stageType;
+ this.stagePrefix = stagePrefix;
+ this.objectInfo = objectInfo;
+ this.userName = userName;
+ this.dataDescription = dataDescription;
+ this.setOrigStmt(originStmt);
+ }
+
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 6776eecc1c1..bdfed0814a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -845,7 +845,10 @@ public class DataDescription implements
InsertStmt.DataDesc {
+ "The mapping operator error, op: " +
predicate.getOp());
}
Expr child0 = predicate.getChild(0);
- if (!(child0 instanceof SlotRef)) {
+ if (child0 instanceof CastExpr && child0.getChild(0) instanceof
SlotRef) {
+ predicate.setChild(0, child0.getChild(0));
+ child0 = predicate.getChild(0);
+ } else if (!(child0 instanceof SlotRef)) {
throw new AnalysisException("Mapping function expr only
support the column or eq binary predicate. "
+ "The mapping column error. column: " +
child0.toSql());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index d54ee98b0cf..85a443ae344 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -29,6 +29,7 @@ import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.PassVar;
import org.apache.doris.analysis.PasswordOptions;
import org.apache.doris.analysis.SetType;
+import org.apache.doris.analysis.StageAndPattern;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableScanParams;
@@ -561,6 +562,7 @@ import
org.apache.doris.nereids.trees.plans.commands.CancelWarmUpJobCommand;
import org.apache.doris.nereids.trees.plans.commands.CleanAllProfileCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.Constraint;
+import org.apache.doris.nereids.trees.plans.commands.CopyIntoCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateEncryptkeyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateFileCommand;
@@ -740,6 +742,8 @@ import
org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc;
import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc;
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
+import org.apache.doris.nereids.trees.plans.commands.info.CopyFromDesc;
+import org.apache.doris.nereids.trees.plans.commands.info.CopyIntoInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateIndexOp;
import org.apache.doris.nereids.trees.plans.commands.info.CreateJobInfo;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
@@ -1012,6 +1016,82 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
return new CancelJobTaskCommand(jobName, taskId);
}
+ private StageAndPattern
getStageAndPattern(DorisParser.StageAndPatternContext ctx) {
+ if (ctx.pattern != null) {
+ return new StageAndPattern(stripQuotes(ctx.stage.getText()),
stripQuotes(ctx.pattern.getText()));
+ } else {
+ return new StageAndPattern(stripQuotes(ctx.stage.getText()), null);
+ }
+ }
+
+ @Override
+ public LogicalPlan visitCopyInto(DorisParser.CopyIntoContext ctx) {
+ ImmutableList.Builder<String> tableName = ImmutableList.builder();
+ if (null != ctx.name) {
+ List<String> nameParts = visitMultipartIdentifier(ctx.name);
+ tableName.addAll(nameParts);
+ }
+ List<String> columns = (null != ctx.columns) ?
visitIdentifierList(ctx.columns) : null;
+ StageAndPattern stageAndPattern =
getStageAndPattern(ctx.stageAndPattern());
+ CopyFromDesc copyFromDesc = null;
+ if (null != ctx.SELECT()) {
+ List<NamedExpression> projects =
getNamedExpressions(ctx.selectColumnClause().namedExpressionSeq());
+ Optional<Expression> where = Optional.empty();
+ if (ctx.whereClause() != null) {
+ where =
Optional.of(getExpression(ctx.whereClause().booleanExpression()));
+ }
+ copyFromDesc = new CopyFromDesc(stageAndPattern, projects, where);
+ } else {
+ copyFromDesc = new CopyFromDesc(stageAndPattern);
+ }
+ Map<String, String> properties = visitPropertyClause(ctx.properties);
+ copyFromDesc.setTargetColumns(columns);
+ CopyIntoInfo copyInfoInfo = null;
+ if (null != ctx.selectHint()) {
+ if ((selectHintMap == null) || selectHintMap.isEmpty()) {
+ throw new AnalysisException("hint should be in right place: "
+ ctx.getText());
+ }
+ List<ParserRuleContext> selectHintContexts = Lists.newArrayList();
+ for (Integer key : selectHintMap.keySet()) {
+ if (key > ctx.getStart().getStopIndex() && key <
ctx.getStop().getStartIndex()) {
+ selectHintContexts.add(selectHintMap.get(key));
+ }
+ }
+ if (selectHintContexts.size() != 1) {
+ throw new AnalysisException("only one hint is allowed in: " +
ctx.getText());
+ }
+ SelectHintContext selectHintContext = (SelectHintContext)
selectHintContexts.get(0);
+ Map<String, String> parameterNames = Maps.newLinkedHashMap();
+ for (HintStatementContext hintStatement :
selectHintContext.hintStatements) {
+ String hintName =
hintStatement.hintName.getText().toLowerCase(Locale.ROOT);
+ if (!hintName.equalsIgnoreCase("set_var")) {
+ throw new AnalysisException("only set_var hint is allowed
in: " + ctx.getText());
+ }
+ for (HintAssignmentContext kv : hintStatement.parameters) {
+ if (kv.key != null) {
+ String parameterName = visitIdentifierOrText(kv.key);
+ Optional<String> value = Optional.empty();
+ if (kv.constantValue != null) {
+ Literal literal = (Literal)
visit(kv.constantValue);
+ value =
Optional.ofNullable(literal.toLegacyLiteral().getStringValue());
+ } else if (kv.identifierValue != null) {
+ // maybe we should throw exception when the
identifierValue is quoted identifier
+ value =
Optional.ofNullable(kv.identifierValue.getText());
+ }
+ parameterNames.put(parameterName, value.get());
+ }
+ }
+ }
+ Map<String, Map<String, String>> setVarHint =
Maps.newLinkedHashMap();
+ setVarHint.put("set_var", parameterNames);
+ copyInfoInfo = new CopyIntoInfo(tableName.build(), copyFromDesc,
properties, setVarHint);
+ } else {
+ copyInfoInfo = new CopyIntoInfo(tableName.build(), copyFromDesc,
properties, null);
+ }
+
+ return new CopyIntoCommand(copyInfoInfo);
+ }
+
@Override
public String visitCommentSpec(DorisParser.CommentSpecContext ctx) {
String commentSpec = ctx == null ? "''" :
ctx.STRING_LITERAL().getText();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 1ab6d5fd1e1..0d60298421a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -135,6 +135,7 @@ public enum PlanType {
// commands
ADMIN_CHECK_TABLETS_COMMAND,
+ COPY_INTO_COMMAND,
CREATE_POLICY_COMMAND,
CREATE_TABLE_COMMAND,
CREATE_SQL_BLOCK_RULE_COMMAND,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CopyIntoCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CopyIntoCommand.java
new file mode 100644
index 00000000000..9fac32c7ad9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CopyIntoCommand.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.CopyStmt;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.commands.info.CopyIntoInfo;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.DdlExecutor;
+import org.apache.doris.qe.StmtExecutor;
+
+/**
+ * copy into command
+ */
+public class CopyIntoCommand extends Command implements ForwardWithSync {
+
+ CopyIntoInfo copyIntoInfo;
+
+ /**
+ * Use for copy into command.
+ */
+ public CopyIntoCommand(CopyIntoInfo info) {
+ super(PlanType.COPY_INTO_COMMAND);
+ this.copyIntoInfo = info;
+ }
+
+ @Override
+ public void run(ConnectContext ctx, StmtExecutor executor) throws
Exception {
+ copyIntoInfo.validate(ctx);
+ CopyStmt copyStmt =
copyIntoInfo.toLegacyStatement(executor.getOriginStmt());
+ DdlExecutor.executeCopyStmt(ctx.getEnv(), copyStmt);
+ // copy into used
+ if (executor.getContext().getState().getResultSet() != null) {
+ if (executor.isProxy()) {
+
executor.setProxyShowResultSet(executor.getContext().getState().getResultSet());
+ return;
+ }
+
executor.sendResultSet(executor.getContext().getState().getResultSet());
+ }
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitCopyIntoCommand(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
similarity index 70%
copy from fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java
copy to
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
index b2a57cd1f5d..3ba1ec67b06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CopyFromParam.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyFromDesc.java
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.analysis;
+package org.apache.doris.nereids.trees.plans.commands.info;
-import org.apache.doris.analysis.BinaryPredicate.Operator;
+import org.apache.doris.analysis.CopyFromParam;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StageAndPattern;
+import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@@ -26,48 +29,81 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.analyzer.UnboundStar;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.util.ExpressionUtils;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import lombok.Getter;
-import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
-public class CopyFromParam {
+/**
+ * copy from desc ==> copy from param
+ */
+public class CopyFromDesc {
private static final Logger LOG =
LogManager.getLogger(CopyFromParam.class);
private static final String DOLLAR = "$";
-
- @Getter
private StageAndPattern stageAndPattern;
- @Getter
- private List<Expr> exprList;
- @Getter
- private Expr fileFilterExpr;
- @Getter
+ private List<NamedExpression> exprList;
+ private Optional<Expression> fileFilterExpr;
private List<String> fileColumns;
- @Getter
- private List<Expr> columnMappingList;
- @Setter
+ private List<Expression> columnMappingList;
private List<String> targetColumns;
- public CopyFromParam(StageAndPattern stageAndPattern) {
+ public CopyFromDesc(StageAndPattern stageAndPattern) {
this.stageAndPattern = stageAndPattern;
}
- public CopyFromParam(StageAndPattern stageAndPattern, List<Expr> exprList,
Expr whereExpr) {
+ public CopyFromDesc(StageAndPattern stageAndPattern, List<NamedExpression>
exprList,
+ Optional<Expression> whereExpr) {
this.stageAndPattern = stageAndPattern;
this.exprList = exprList;
this.fileFilterExpr = whereExpr;
}
- public void analyze(String fullDbName, TableName tableName, boolean
useDeleteSign, String fileType)
+ public void setTargetColumns(List<String> targetColumns) {
+ this.targetColumns = targetColumns;
+ }
+
+ public StageAndPattern getStageAndPattern() {
+ return stageAndPattern;
+ }
+
+ public List<Expression> getColumnMappingList() {
+ return columnMappingList;
+ }
+
+ public List<NamedExpression> getExprList() {
+ return exprList;
+ }
+
+ public List<String> getFileColumns() {
+ return fileColumns;
+ }
+
+ public Optional<Expression> getFileFilterExpr() {
+ return fileFilterExpr;
+ }
+
+ public List<String> getTargetColumns() {
+ return targetColumns;
+ }
+
+ /**
+ * analyze
+ */
+ public void validate(String fullDbName, TableName tableName, boolean
useDeleteSign, String fileType)
throws AnalysisException {
if (exprList == null && fileFilterExpr == null && !useDeleteSign) {
return;
@@ -136,14 +172,15 @@ public class CopyFromParam {
parseColumnNames(fileType, fileColumns);
if (exprList != null) {
- if (targetColumns.size() != exprList.size()) {
-
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT);
- }
- for (int i = 0; i < targetColumns.size(); i++) {
- Expr expr = exprList.get(i);
- BinaryPredicate binaryPredicate = new
BinaryPredicate(Operator.EQ,
- new SlotRef(null, targetColumns.get(i)), expr);
- columnMappingList.add(binaryPredicate);
+ if (!(exprList.size() == 1 && exprList.get(0) instanceof
UnboundStar)) {
+ if (targetColumns.size() != exprList.size()) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT);
+ }
+ for (int i = 0; i < targetColumns.size(); i++) {
+ Expression expr = exprList.get(i);
+ EqualTo binaryPredicate = new EqualTo(new
UnboundSlot(targetColumns.get(i)), expr);
+ columnMappingList.add(binaryPredicate);
+ }
}
} else {
for (int i = 0; i < targetColumns.size(); i++) {
@@ -152,9 +189,8 @@ public class CopyFromParam {
// mode. Because if the src data is an expr, strict mode
judgment will
// not be performed.
if
(!fileColumns.get(i).equalsIgnoreCase(targetColumns.get(i))) {
- BinaryPredicate binaryPredicate = new
BinaryPredicate(Operator.EQ,
- new SlotRef(null, targetColumns.get(i)),
- new SlotRef(null, fileColumns.get(i)));
+ EqualTo binaryPredicate = new EqualTo(new
UnboundSlot(targetColumns.get(i)),
+ new UnboundSlot(fileColumns.get(i)));
columnMappingList.add(binaryPredicate);
}
}
@@ -167,7 +203,7 @@ public class CopyFromParam {
return false;
}
List<SlotRef> slotRefs = Lists.newArrayList();
- Expr.collectList(exprList, SlotRef.class, slotRefs);
+ // Expr.collectList(exprList, SlotRef.class, slotRefs);
Set<String> columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (SlotRef slotRef : slotRefs) {
String columnName = slotRef.getColumnName();
@@ -182,7 +218,7 @@ public class CopyFromParam {
}
}
if (addDeleteSign) {
- exprList.add(new SlotRef(null, Column.DELETE_SIGN));
+ // exprList.add(new SlotRef(null, Column.DELETE_SIGN));
fileColumns.add(Column.DELETE_SIGN);
}
return true;
@@ -200,29 +236,29 @@ public class CopyFromParam {
private int getMaxFileColumnId() throws AnalysisException {
int maxId = 0;
if (exprList != null) {
- int maxFileColumnId = getMaxFileColumnId(exprList);
+ int maxFileColumnId = getMaxFileFilterColumnId(
+ exprList.stream().map(expr -> (Expression)
expr).collect(Collectors.toList()));
maxId = maxId > maxFileColumnId ? maxId : maxFileColumnId;
}
if (fileFilterExpr != null) {
- int maxFileColumnId =
getMaxFileColumnId(Lists.newArrayList(fileFilterExpr));
+ int maxFileColumnId =
getMaxFileFilterColumnId(Lists.newArrayList(fileFilterExpr.get()));
maxId = maxId > maxFileColumnId ? maxId : maxFileColumnId;
}
return maxId;
}
- private int getMaxFileColumnId(List<Expr> exprList) throws
AnalysisException {
- List<SlotRef> slotRefs = Lists.newArrayList();
- Expr.collectList(exprList, SlotRef.class, slotRefs);
+ private int getMaxFileFilterColumnId(List<Expression> exprList) throws
AnalysisException {
+ Set<Slot> slots = ExpressionUtils.getInputSlotSet(exprList);
int maxId = 0;
- for (SlotRef slotRef : slotRefs) {
- int fileColumnId = getFileColumnIdOfSlotRef(slotRef);
+ for (Slot slot : slots) {
+ int fileColumnId = getFileColumnIdOfSlotRef((UnboundSlot) slot);
maxId = fileColumnId < maxId ? maxId : fileColumnId;
}
return maxId;
}
- private int getFileColumnIdOfSlotRef(SlotRef slotRef) throws
AnalysisException {
- String columnName = slotRef.getColumnName();
+ private int getFileColumnIdOfSlotRef(UnboundSlot unboundSlot) throws
AnalysisException {
+ String columnName = unboundSlot.getName();
try {
if (!columnName.startsWith(DOLLAR)) {
throw new AnalysisException("can not mix column name and
dollar sign");
@@ -245,23 +281,4 @@ public class CopyFromParam {
}
}
}
-
- public String toSql() {
- StringBuilder sb = new StringBuilder();
- if (columnMappingList != null || fileFilterExpr != null) {
- sb.append("(SELECT ");
- if (columnMappingList != null) {
- Joiner.on(", ").appendTo(sb,
- Lists.transform(columnMappingList, (Function<Expr,
Object>) expr -> expr.toSql()));
- }
- sb.append(" FROM ").append(stageAndPattern.toSql());
- if (fileFilterExpr != null) {
- sb.append(" WHERE ").append(fileFilterExpr.toSql());
- }
- sb.append(")");
- } else {
- sb.append(stageAndPattern.toSql());
- }
- return sb.toString();
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
new file mode 100644
index 00000000000..bb4dde7143e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CopyIntoInfo.java
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.CastExpr;
+import org.apache.doris.analysis.CopyFromParam;
+import org.apache.doris.analysis.CopyIntoProperties;
+import org.apache.doris.analysis.CopyStmt;
+import org.apache.doris.analysis.DataDescription;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.LabelName;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.Separator;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StageAndPattern;
+import org.apache.doris.analysis.StageProperties;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.catalog.CloudEnv;
+import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB;
+import org.apache.doris.cloud.proto.Cloud.StagePB;
+import org.apache.doris.cloud.proto.Cloud.StagePB.StageType;
+import org.apache.doris.cloud.stage.StageUtil;
+import org.apache.doris.cloud.storage.RemoteBase;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.property.constants.BosProperties;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.analyzer.Scope;
+import org.apache.doris.nereids.analyzer.UnboundRelation;
+import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.jobs.executor.Rewriter;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.rules.analysis.BindRelation;
+import org.apache.doris.nereids.rules.analysis.ExpressionAnalyzer;
+import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
+import org.apache.doris.nereids.trees.expressions.Cast;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.SessionVariable;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * copy into informations
+ */
+public class CopyIntoInfo {
+ private static final Logger LOG = LogManager.getLogger(CopyIntoInfo.class);
+
+ private static final String S3_BUCKET = "bucket";
+ private static final String S3_PREFIX = "prefix";
+
+ private final List<String> nameParts;
+ private CopyFromDesc copyFromDesc;
+ private CopyFromParam legacyCopyFromParam;
+ private CopyIntoProperties copyIntoProperties;
+ private Map<String, Map<String, String>> optHints;
+
+ private LabelName label = null;
+ private BrokerDesc brokerDesc = null;
+ private DataDescription dataDescription = null;
+ private final Map<String, String> brokerProperties = new HashMap<>();
+ private Map<String, String> properties = new HashMap<>();
+
+ private String stage;
+ private String stageId;
+ private StageType stageType;
+ private String stagePrefix;
+ private RemoteBase.ObjectInfo objectInfo;
+ private String userName;
+ private TableName tableName;
+
+ /**
+ * copy into informations
+ */
+ public CopyIntoInfo(List<String> nameParts, CopyFromDesc copyFromDesc,
+ Map<String, String> properties, Map<String,
Map<String, String>> optHints) {
+ this.nameParts = nameParts;
+ this.copyFromDesc = copyFromDesc;
+ Map<String, String> newProperties = new HashMap<>();
+ for (String key : properties.keySet()) {
+ newProperties.put(key, properties.get(key));
+ }
+ this.copyIntoProperties = new CopyIntoProperties(newProperties);
+ this.optHints = optHints;
+ this.stage = copyFromDesc.getStageAndPattern().getStageName();
+ }
+
+ /**
+ * validate copy into information
+ */
+ public void validate(ConnectContext ctx) throws DdlException,
AnalysisException {
+ if (this.optHints != null &&
this.optHints.containsKey(SessionVariable.CLOUD_CLUSTER)) {
+ ((CloudEnv) Env.getCurrentEnv())
+
.checkCloudClusterPriv(this.optHints.get("set_var").get(SessionVariable.CLOUD_CLUSTER));
+ }
+ // generate a label
+ String labelName = "copy_" +
DebugUtil.printId(ctx.queryId()).replace("-", "_");
+ String ctl = null;
+ String db = null;
+ String table = null;
+ switch (nameParts.size()) {
+ case 1: { // table
+ ctl = ctx.getDefaultCatalog();
+ if (Strings.isNullOrEmpty(ctl)) {
+ ctl = InternalCatalog.INTERNAL_CATALOG_NAME;
+ }
+ db = ctx.getDatabase();
+ if (Strings.isNullOrEmpty(db)) {
+ throw new AnalysisException("Please specify a database
name.");
+ }
+ table = nameParts.get(0);
+ break;
+ }
+ case 2:
+ // db.table
+ // Use database name from table name parts.
+ break;
+ case 3: {
+ // catalog.db.table
+ ctl = nameParts.get(0);
+ db = nameParts.get(1);
+ table = nameParts.get(2);
+ break;
+ }
+ default:
+ throw new IllegalStateException("Table name [" + nameParts +
"] is invalid.");
+ }
+ tableName = new TableName(ctl, db, table);
+ label = new LabelName(tableName.getDb(), labelName);
+ if (stage.isEmpty()) {
+ throw new AnalysisException("Stage name can not be empty");
+ }
+ this.userName =
ClusterNamespace.getNameFromFullName(ctx.getCurrentUserIdentity().getQualifiedUser());
+ doValidate(userName, db, true);
+ }
+
+ /**
+ * do validate
+ */
+ public void doValidate(String user, String db, boolean checkAuth) throws
AnalysisException, DdlException {
+ // get stage from meta service
+ StagePB stagePB = StageUtil.getStage(stage, userName, true);
+ validateStagePB(stagePB);
+ // generate broker desc
+ brokerDesc = new BrokerDesc("S3", StorageBackend.StorageType.S3,
brokerProperties);
+ // generate data description
+ String filePath = "s3://" + brokerProperties.get(S3_BUCKET) + "/" +
brokerProperties.get(S3_PREFIX);
+ Separator separator = copyIntoProperties.getColumnSeparator() != null
? new Separator(
+ copyIntoProperties.getColumnSeparator()) : null;
+ String fileFormatStr = copyIntoProperties.getFileType();
+ Map<String, String> dataDescProperties =
copyIntoProperties.getDataDescriptionProperties();
+ copyFromDesc.validate(db, tableName,
this.copyIntoProperties.useDeleteSign(),
+ copyIntoProperties.getFileTypeIgnoreCompression());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("copy into params. sql: {}, fileColumns: {},
columnMappingList: {}, filter: {}",
+ copyFromDesc.getFileColumns().toString(),
copyFromDesc.getColumnMappingList().toString(),
+ copyFromDesc.getFileFilterExpr().toString());
+ }
+
+ List<String> nameParts = Lists.newArrayList();
+ nameParts.add(db);
+ nameParts.add(tableName.getTbl());
+ Plan unboundRelation = new
UnboundRelation(StatementScopeIdGenerator.newRelationId(), nameParts);
+ CascadesContext cascadesContext =
CascadesContext.initContext(ConnectContext.get().getStatementContext(),
+ unboundRelation, PhysicalProperties.ANY);
+ Rewriter.getWholeTreeRewriterWithCustomJobs(cascadesContext,
+ ImmutableList.of(Rewriter.bottomUp(new BindRelation()))).execute();
+ Plan boundRelation = cascadesContext.getRewritePlan();
+ // table could have delete sign in LogicalFilter above
+ if (cascadesContext.getRewritePlan() instanceof LogicalFilter) {
+ boundRelation = (Plan) ((LogicalFilter)
cascadesContext.getRewritePlan()).child();
+ }
+ PlanTranslatorContext context = new
PlanTranslatorContext(cascadesContext);
+ List<Slot> slots = boundRelation.getOutput();
+ Scope scope = new Scope(slots);
+ ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, scope,
cascadesContext, false, false);
+
+ Map<SlotReference, SlotRef> translateMap = Maps.newHashMap();
+
+ TupleDescriptor tupleDescriptor = context.generateTupleDesc();
+ tupleDescriptor.setTable(((OlapScan) boundRelation).getTable());
+ for (int i = 0; i < boundRelation.getOutput().size(); i++) {
+ SlotReference slotReference = (SlotReference)
boundRelation.getOutput().get(i);
+ SlotRef slotRef = new SlotRef(null, slotReference.getName());
+ translateMap.put(slotReference, slotRef);
+ context.createSlotDesc(tupleDescriptor, slotReference, ((OlapScan)
boundRelation).getTable());
+ }
+
+ List<Expr> legacyColumnMappingList = null;
+ if (copyFromDesc.getColumnMappingList() != null &&
!copyFromDesc.getColumnMappingList().isEmpty()) {
+ legacyColumnMappingList = new ArrayList<>();
+ for (Expression expression : copyFromDesc.getColumnMappingList()) {
+ legacyColumnMappingList.add(translateToLegacyExpr(expression,
analyzer, context, cascadesContext));
+ }
+ }
+ Expr legacyFileFilterExpr = null;
+ if (copyFromDesc.getFileFilterExpr().isPresent()) {
+ legacyFileFilterExpr =
translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(),
+ analyzer, context, cascadesContext);
+ }
+
+ dataDescription = new DataDescription(tableName.getTbl(), null,
Lists.newArrayList(filePath),
+ copyFromDesc.getFileColumns(), separator, fileFormatStr, null,
false,
+ legacyColumnMappingList, legacyFileFilterExpr, null,
LoadTask.MergeType.APPEND, null,
+ null, dataDescProperties);
+
dataDescription.setCompressType(StageUtil.parseCompressType(copyIntoProperties.getCompression()));
+ if (!(copyFromDesc.getColumnMappingList() == null
+ || copyFromDesc.getColumnMappingList().isEmpty())) {
+ dataDescription.setIgnoreCsvRedundantCol(true);
+ }
+ // analyze data description
+ if (checkAuth) {
+ dataDescription.analyze(db);
+ } else {
+ dataDescription.analyzeWithoutCheckPriv(db);
+ }
+ String path;
+ for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
+ path = dataDescription.getFilePaths().get(i);
+ dataDescription.getFilePaths().set(i,
BosProperties.convertPathToS3(path));
+ StorageBackend.checkPath(path, brokerDesc.getStorageType(), null);
+ dataDescription.getFilePaths().set(i, path);
+ }
+
+ try {
+ properties.putAll(copyIntoProperties.getExecProperties());
+ // TODO support exec params as LoadStmt
+ LoadStmt.checkProperties(properties);
+ } catch (DdlException e) {
+ throw new AnalysisException(e.getMessage());
+ }
+
+ // translate copy from description to copy from param
+ legacyCopyFromParam = toLegacyParam(copyFromDesc, analyzer, context,
cascadesContext);
+ }
+
+ private CopyFromParam toLegacyParam(CopyFromDesc copyFromDesc,
ExpressionAnalyzer analyzer,
+ PlanTranslatorContext context,
CascadesContext cascadesContext) {
+ StageAndPattern stageAndPattern = copyFromDesc.getStageAndPattern();
+ List<Expr> exprList = null;
+ if (copyFromDesc.getExprList() != null) {
+ exprList = new ArrayList<>();
+ for (Expression expression : copyFromDesc.getExprList()) {
+ exprList.add(translateToLegacyExpr(expression, analyzer,
context, cascadesContext));
+ }
+ }
+ Expr fileFilterExpr = null;
+ if (copyFromDesc.getFileFilterExpr().isPresent()) {
+ fileFilterExpr =
translateToLegacyExpr(copyFromDesc.getFileFilterExpr().get(),
+ analyzer, context, cascadesContext);
+ }
+ List<String> fileColumns = copyFromDesc.getFileColumns();
+ List<Expr> columnMappingList = null;
+ if (copyFromDesc.getColumnMappingList() != null) {
+ columnMappingList = new ArrayList<>();
+ for (Expression expression : copyFromDesc.getColumnMappingList()) {
+ columnMappingList.add(translateToLegacyExpr(expression,
analyzer, context, cascadesContext));
+ }
+ }
+ List<String> targetColumns = copyFromDesc.getTargetColumns();
+ return new CopyFromParam(stageAndPattern, exprList, fileFilterExpr,
fileColumns, columnMappingList,
+ targetColumns);
+ }
+
+ private Expr translateToLegacyExpr(Expression expr, ExpressionAnalyzer
analyzer, PlanTranslatorContext context,
+ CascadesContext cascadesContext) {
+ Expression expression;
+ try {
+ expression = analyzer.analyze(expr, new
ExpressionRewriteContext(cascadesContext));
+ } catch (org.apache.doris.nereids.exceptions.AnalysisException e) {
+ throw new
org.apache.doris.nereids.exceptions.AnalysisException("In where clause '"
+ + expr.toSql() + "', "
+ + Utils.convertFirstChar(e.getMessage()));
+ }
+ ExpressionToExpr translator = new ExpressionToExpr();
+ return expression.accept(translator, context);
+ }
+
+ private static class ExpressionToExpr extends ExpressionTranslator {
+ @Override
+ public Expr visitCast(Cast cast, PlanTranslatorContext context) {
+ // left child of cast is target type, right child of cast is
expression
+ return new CastExpr(cast.getDataType().toCatalogDataType(),
+ cast.child().accept(this, context), null);
+ }
+ }
+
+ // after validateStagePB, fileFormat and copyOption is not null
+ private void validateStagePB(StagePB stagePB) throws AnalysisException {
+ stageType = stagePB.getType();
+ stageId = stagePB.getStageId();
+ ObjectStoreInfoPB objInfo = stagePB.getObjInfo();
+ stagePrefix = objInfo.getPrefix();
+ objectInfo = RemoteBase.analyzeStageObjectStoreInfo(stagePB);
+ brokerProperties.put(S3Properties.Env.ENDPOINT, objInfo.getEndpoint());
+ brokerProperties.put(S3Properties.Env.REGION, objInfo.getRegion());
+ brokerProperties.put(S3Properties.Env.ACCESS_KEY, objectInfo.getAk());
+ brokerProperties.put(S3Properties.Env.SECRET_KEY, objectInfo.getSk());
+ if (objectInfo.getToken() != null) {
+ brokerProperties.put(S3Properties.Env.TOKEN,
objectInfo.getToken());
+ }
+ brokerProperties.put(S3_BUCKET, objInfo.getBucket());
+ brokerProperties.put(S3_PREFIX, objInfo.getPrefix());
+ // S3 Provider properties should be case insensitive.
+ brokerProperties.put(S3Properties.PROVIDER,
objInfo.getProvider().toString().toUpperCase());
+ StageProperties stageProperties = new
StageProperties(stagePB.getPropertiesMap());
+ this.copyIntoProperties.mergeProperties(stageProperties);
+ this.copyIntoProperties.analyze();
+ }
+
+ public CopyStmt toLegacyStatement(OriginStatement originStmt) {
+ return new CopyStmt(tableName, legacyCopyFromParam,
copyIntoProperties, optHints, label, stageId, stageType,
+ stagePrefix, objectInfo, userName, brokerProperties, properties,
dataDescription, brokerDesc, originStmt);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index bbb197b613e..a0d55d59777 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -48,6 +48,7 @@ import
org.apache.doris.nereids.trees.plans.commands.CancelMTMVTaskCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelWarmUpJobCommand;
import org.apache.doris.nereids.trees.plans.commands.CleanAllProfileCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
+import org.apache.doris.nereids.trees.plans.commands.CopyIntoCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateEncryptkeyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateFileCommand;
@@ -263,6 +264,10 @@ public interface CommandVisitor<R, C> {
return visitCommand(exportCommand, context);
}
+ default R visitCopyIntoCommand(CopyIntoCommand copyIntoCommand, C context)
{
+ return visitCommand(copyIntoCommand, context);
+ }
+
default R visitCreateEncryptKeyCommand(CreateEncryptkeyCommand
createEncryptKeyCommand, C context) {
return visitCommand(createEncryptKeyCommand, context);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 68fa34071f6..6d3475dbebe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -454,7 +454,7 @@ public class DdlExecutor {
}
}
- private static void executeCopyStmt(Env env, CopyStmt copyStmt) throws
Exception {
+ public static void executeCopyStmt(Env env, CopyStmt copyStmt) throws
Exception {
CopyJob job = (CopyJob) (((CloudLoadManager)
env.getLoadManager()).createLoadJobFromStmt(copyStmt));
if (!copyStmt.isAsync()) {
// wait for execute finished
@@ -475,7 +475,7 @@ public class DdlExecutor {
entry.add(loadingStatus.getTrackingUrl());
result.add(entry);
queryState.setResultSet(new
ShowResultSet(copyStmt.getMetaData(), result));
- copyStmt.getAnalyzer().getContext().setState(queryState);
+ ConnectContext.get().setState(queryState);
return;
} else if (job.getState() == JobState.FINISHED) {
EtlStatus loadingStatus = job.getLoadingStatus();
@@ -493,7 +493,7 @@ public class DdlExecutor {
entry.add(loadingStatus.getTrackingUrl());
result.add(entry);
queryState.setResultSet(new
ShowResultSet(copyStmt.getMetaData(), result));
- copyStmt.getAnalyzer().getContext().setState(queryState);
+ ConnectContext.get().setState(queryState);
return;
}
}
@@ -510,7 +510,7 @@ public class DdlExecutor {
entry.add("");
result.add(entry);
queryState.setResultSet(new ShowResultSet(copyStmt.getMetaData(),
result));
- copyStmt.getAnalyzer().getContext().setState(queryState);
+ ConnectContext.get().setState(queryState);
}
private static void waitJobCompleted(CopyJob job) throws
InterruptedException {
diff --git a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
index bd477d99bf6..ff1cb7bdb83 100644
--- a/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
+++ b/regression-test/suites/load_p0/copy_into/test_copy_into.groovy
@@ -103,10 +103,10 @@ suite("test_copy_into", "p0") {
def errorMsgs = [
"",
- "quality not good enough to cancel",
+ "errCode = 2, detailMessage = In where clause '(p_type
= not_exist)', unknown column 'not_exist' in 'table list",
"",
"",
- "quality not good enough to cancel",
+ "errCode = 2, detailMessage = In where clause '(p_type
= not_exist)', unknown column 'not_exist' in 'table list",
"",
"",
"",
@@ -127,8 +127,17 @@ suite("test_copy_into", "p0") {
for (int i = 0; i < tartgetColumnsList.size(); i++) {
sql "$dropTable"
sql "$createTable"
+ if (i == 1 || i == 4) {
+ try {
+ result = do_copy_into.call(tableName,
tartgetColumnsList[i], selectColumnsList[i],
+ externalStageName, filePrefix, whereExprs[i])
+ } catch (Exception e) {
+ assertEquals(errorMsgs[i], e.getMessage())
+ }
+ continue;
+ }
result = do_copy_into.call(tableName, tartgetColumnsList[i],
selectColumnsList[i],
- externalStageName, filePrefix,
whereExprs[i])
+ externalStageName, filePrefix, whereExprs[i])
logger.info("i: " + i + ", copy result: " + result)
assertTrue(result.size() == 1)
if (result[0][1].equals("FINISHED")) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]