This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.15 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 8bfc0a05f4f3732e6d1f1179f3f858509f825838 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Fri Nov 5 09:43:28 2021 +0800 Fix hadoop load failed when enable batch delete in unique table (#6996) --- .../src/main/java/org/apache/doris/load/Load.java | 23 ++++++++++++++++++++++ .../apache/doris/task/HadoopLoadPendingTask.java | 4 +--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 9469e4c..4e0d4d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -83,6 +83,7 @@ import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.LoadJob.JobState; +import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.ReplicaPersistInfo; @@ -581,6 +582,11 @@ public class Load { // set default timeout job.setTimeoutSecond(Config.hadoop_load_default_timeout_second); } + for (DataDescription dataDescription : dataDescriptions) { + if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND) { + throw new DdlException("MERGE OR DELETE is not supported in hadoop load."); + } + } } else if (etlJobType == EtlJobType.BROKER) { if (job.getTimeoutSecond() == 0) { // set default timeout @@ -758,6 +764,23 @@ public class Load { // do nothing } + } else if (!column.isVisible()) { + /* + * For batch delete table add hidden column __DORIS_DELETE_SIGN__ to columns + * eg: + * (A, B, C) + * -> + * (A, B, C) SET (__DORIS_DELETE_SIGN__ = 0) + */ + columnToHadoopFunction.put(column.getName(), Pair.create("default_value", Lists.newArrayList(column.getDefaultValue()))); + ImportColumnDesc importColumnDesc = null; + try { + importColumnDesc = new ImportColumnDesc(column.getName(), + new FunctionCallExpr("default_value", Arrays.asList(column.getDefaultValueExpr()))); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + parsedColumnExprList.add(importColumnDesc); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java index cffd772..e1944eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/HadoopLoadPendingTask.java @@ -169,7 +169,7 @@ public class HadoopLoadPendingTask extends LoadPendingTask { private Map<String, EtlColumn> createEtlColumns(OlapTable table) { Map<String, EtlColumn> etlColumns = Maps.newHashMap(); - for (Column column : table.getBaseSchema()) { + for (Column column : table.getBaseSchema(true)) { etlColumns.put(column.getName(), new EtlColumn(column)); } return etlColumns; @@ -225,7 +225,6 @@ public class HadoopLoadPendingTask extends LoadPendingTask { } columnRefs.add(dppColumn); } - // distribution infos DistributionInfo distributionInfo = partition.getDistributionInfo(); List<String> distributionColumnRefs = Lists.newArrayList(); @@ -266,7 +265,6 @@ public class HadoopLoadPendingTask extends LoadPendingTask { LOG.warn("unknown distribution type. type: {}", distributionInfo.getType().name()); throw new LoadException("unknown distribution type. type: " + distributionInfo.getType().name()); } - etlIndex.setPidKeyCount(keySize); etlIndex.setColumnRefs(columnRefs); etlIndices.put(String.valueOf(indexId), etlIndex); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org