This is an automated email from the ASF dual-hosted git repository. zhangchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4c00b1760b [feature](partial update) Support partial update for broker load (#22970) 4c00b1760b is described below commit 4c00b1760b7f2805c4e18865fb6f41785ce6fcca Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Tue Aug 29 14:41:01 2023 +0800 [feature](partial update) Support partial update for broker load (#22970) --- .../data_case/partial_update/update.csv | 2 + .../data_case/partial_update/update2.csv | 2 + .../Load/BROKER-LOAD.md | 4 + .../Load/BROKER-LOAD.md | 4 + .../java/org/apache/doris/analysis/LoadStmt.java | 16 ++++ .../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 5 ++ .../apache/doris/load/loadv2/LoadLoadingTask.java | 6 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 42 +++++++++- .../doris/load/loadv2/BrokerLoadJobTest.java | 2 +- .../hive/test_partial_update_broker_load.out | 16 ++++ .../hive/test_partial_update_broker_load.groovy | 92 ++++++++++++++++++++++ 12 files changed, 188 insertions(+), 5 deletions(-) diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/data_case/partial_update/update.csv b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/data_case/partial_update/update.csv new file mode 100644 index 0000000000..7abdf2f85d --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/data_case/partial_update/update.csv @@ -0,0 +1,2 @@ +2,400 +1,200 \ No newline at end of file diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/data_case/partial_update/update2.csv b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/data_case/partial_update/update2.csv new file mode 100644 index 0000000000..f7f6aabd57 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/data_case/partial_update/update2.csv @@ -0,0 +1,2 @@ +1,999 +3,888 diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 59956abc76..0cbfc8fd54 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -175,6 +175,10 @@ WITH BROKER broker_name Whether to impose strict restrictions on data. Defaults to false. + - `partial_columns` + + Boolean type, True means that use partial column update, the default value is false, this parameter is only allowed to be set when the table model is Unique and Merge on Write is used. + - `timezone` Specify the time zone for some functions that are affected by time zones, such as `strftime/alignment_timestamp/from_unixtime`, etc. Please refer to the [timezone](../../../../advanced/time-zone.md) documentation for details. If not specified, the "Asia/Shanghai" timezone is used diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md index 36dfa68c23..eef30f32a1 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/BROKER-LOAD.md @@ -175,6 +175,10 @@ WITH BROKER broker_name 是否对数据进行严格限制。默认为 false。 + - `partial_columns` + + 布尔类型,为 true 表示使用部分列更新,默认值为 false,该参数只允许在表模型为 Unique 且采用 Merge on Write 时设置。 + - `timezone` 指定某些受时区影响的函数的时区,如 `strftime/alignment_timestamp/from_unixtime` 等等,具体请查阅 [时区](../../../../advanced/time-zone.md) 文档。如果不指定,则使用 "Asia/Shanghai" 时区 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index d4401626cf..eb19620fc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -121,6 +121,7 @@ public class LoadStmt extends DdlStmt { public static final String KEY_IN_PARAM_BACKEND_ID = "backend_id"; public static final String KEY_SKIP_LINES = "skip_lines"; public static final String KEY_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; + public static final String PARTIAL_COLUMNS = "partial_columns"; public static final String KEY_COMMENT = "comment"; @@ -162,6 +163,12 @@ public class LoadStmt extends DdlStmt { return Boolean.valueOf(s); } }) + .put(PARTIAL_COLUMNS, new Function<String, Boolean>() { + @Override + public @Nullable Boolean apply(@Nullable String s) { + return Boolean.valueOf(s); + } + }) .put(TIMEZONE, new Function<String, String>() { @Override public @Nullable String apply(@Nullable String s) { @@ -346,6 +353,15 @@ public class LoadStmt extends DdlStmt { } } + // partial update + final String partialColumnsProperty = properties.get(PARTIAL_COLUMNS); + if (partialColumnsProperty != null) { + if (!partialColumnsProperty.equalsIgnoreCase("true") + && !partialColumnsProperty.equalsIgnoreCase("false")) { + throw new DdlException(PARTIAL_COLUMNS + " is not a boolean"); + } + } + // time zone final String timezone = properties.get(TIMEZONE); if (timezone != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 1f84d96bf3..6dd8675b3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -208,7 +208,7 @@ public class BrokerLoadJob extends BulkLoadJob { // Generate loading task and init the plan of task LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, brokerFileGroups, getDeadlineMs(), getExecMemLimit(), - isStrictMode(), transactionId, this, getTimeZone(), getTimeout(), + isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(), getLoadParallelism(), getSendBatchParallelism(), getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(), useNewLoadScanNode(), getPriority()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 4c8528abd1..2532b0d80e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -430,6 +430,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 2 * 1024 * 1024 * 1024L); jobProperties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, 0.0); jobProperties.put(LoadStmt.STRICT_MODE, false); + jobProperties.put(LoadStmt.PARTIAL_COLUMNS, false); jobProperties.put(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE); jobProperties.put(LoadStmt.LOAD_PARALLELISM, Config.default_load_parallelism); jobProperties.put(LoadStmt.SEND_BATCH_PARALLELISM, 1); @@ -1217,6 +1218,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements return (boolean) jobProperties.get(LoadStmt.STRICT_MODE); } + protected boolean isPartialUpdate() { + return (boolean) jobProperties.get(LoadStmt.PARTIAL_COLUMNS); + } + protected String getTimeZone() { return (String) jobProperties.get(LoadStmt.TIMEZONE); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 450972900f..9a2fdb647f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -61,6 +61,7 @@ public class LoadLoadingTask extends LoadTask { private final long jobDeadlineMs; private final long execMemLimit; private final boolean strictMode; + private final boolean isPartialUpdate; private final long txnId; private final String timezone; // timeout of load job, in seconds @@ -78,7 +79,7 @@ public class LoadLoadingTask extends LoadTask { public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups, - long jobDeadlineMs, long execMemLimit, boolean strictMode, + long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, @@ -91,6 +92,7 @@ public class LoadLoadingTask extends LoadTask { this.jobDeadlineMs = jobDeadlineMs; this.execMemLimit = execMemLimit; this.strictMode = strictMode; + this.isPartialUpdate = isPartialUpdate; this.txnId = txnId; this.failMsg = new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL); this.retryTime = 2; // 2 times is enough @@ -108,7 +110,7 @@ public class LoadLoadingTask extends LoadTask { int fileNum, UserIdentity userInfo) throws UserException { this.loadId = loadId; planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, - strictMode, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism, + strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism, this.useNewLoadScanNode, userInfo); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 14b2c31990..17ee58bd63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -51,6 +51,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -65,6 +66,7 @@ public class LoadingTaskPlanner { private final BrokerDesc brokerDesc; private final List<BrokerFileGroup> fileGroups; private final boolean strictMode; + private final boolean isPartialUpdate; private final long timeoutS; // timeout of load job, in second private final int loadParallelism; private final int sendBatchParallelism; @@ -83,7 +85,7 @@ public class LoadingTaskPlanner { public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups, - boolean strictMode, String timezone, long timeoutS, int loadParallelism, + boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo) { this.loadJobId = loadJobId; this.txnId = txnId; @@ -92,6 +94,7 @@ public class LoadingTaskPlanner { this.brokerDesc = brokerDesc; this.fileGroups = brokerFileGroups; this.strictMode = strictMode; + this.isPartialUpdate = isPartialUpdate; this.analyzer.setTimezone(timezone); this.timeoutS = timeoutS; this.loadParallelism = loadParallelism; @@ -113,8 +116,37 @@ public class LoadingTaskPlanner { TupleDescriptor destTupleDesc = descTable.createTupleDescriptor(); TupleDescriptor scanTupleDesc = destTupleDesc; scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); + if (isPartialUpdate && !table.getEnableUniqueKeyMergeOnWrite()) { + throw new UserException("Only unique key merge on write support partial update"); + } + + HashSet<String> partialUpdateInputColumns = new HashSet<>(); + if (isPartialUpdate) { + for (Column col : table.getFullSchema()) { + boolean existInExpr = false; + for (ImportColumnDesc importColumnDesc : fileGroups.get(0).getColumnExprList()) { + if (importColumnDesc.getColumnName() != null + && importColumnDesc.getColumnName().equals(col.getName())) { + if (!col.isVisible() && !Column.DELETE_SIGN.equals(col.getName())) { + throw new UserException("Partial update should not include invisible column except" + + " delete sign column: " + col.getName()); + } + partialUpdateInputColumns.add(col.getName()); + existInExpr = true; + break; + } + } + if (col.isKey() && !existInExpr) { + throw new UserException("Partial update should include all key columns, missing: " + col.getName()); + } + } + } + // use full schema to fill the descriptor table for (Column col : table.getFullSchema()) { + if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) { + continue; + } SlotDescriptor slotDesc = descTable.addSlotDescriptor(destTupleDesc); slotDesc.setIsMaterialized(true); slotDesc.setColumn(col); @@ -123,6 +155,13 @@ public class LoadingTaskPlanner { scanSlotDesc.setIsMaterialized(true); scanSlotDesc.setColumn(col); scanSlotDesc.setIsNullable(col.isAllowNull()); + scanSlotDesc.setAutoInc(col.isAutoInc()); + if (col.isAutoInc()) { + // auto-increment column should be non-nullable + // however, here we use `NullLiteral` to indicate that a cell should + // be filled with generated value in `VOlapTableSink::_fill_auto_inc_cols()` + scanSlotDesc.setIsNullable(true); + } if (fileGroups.size() > 0) { for (ImportColumnDesc importColumnDesc : fileGroups.get(0).getColumnExprList()) { try { @@ -155,6 +194,7 @@ public class LoadingTaskPlanner { OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds, Config.enable_single_replica_load); olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, false, strictMode); + olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer); // 3. Plan fragment diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index be377f094e..14cd4772db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -365,7 +365,7 @@ public class BrokerLoadJobTest { UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); Profile jobProfile = new Profile("test", false); - LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, 100, 100, false, 100, + LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, 100, 100, false, false, 100, callback, "", 100, 1, 1, true, jobProfile, false, false, LoadTask.Priority.NORMAL); try { UserIdentity userInfo = new UserIdentity("root", "localhost"); diff --git a/regression-test/data/external_table_p0/hive/test_partial_update_broker_load.out b/regression-test/data/external_table_p0/hive/test_partial_update_broker_load.out new file mode 100644 index 0000000000..c12447f46b --- /dev/null +++ b/regression-test/data/external_table_p0/hive/test_partial_update_broker_load.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 alice 1000 123 1 +2 bob 2000 223 2 +3 tom 3000 323 3 + +-- !sql -- +1 alice 200 123 1 +2 bob 400 223 2 +3 tom 3000 323 3 + +-- !sql -- +1 alice 999 123 1 +2 bob 400 223 2 +3 tom 888 323 3 + diff --git a/regression-test/suites/external_table_p0/hive/test_partial_update_broker_load.groovy b/regression-test/suites/external_table_p0/hive/test_partial_update_broker_load.groovy new file mode 100644 index 0000000000..cf7243e5dd --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_partial_update_broker_load.groovy @@ -0,0 +1,92 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_primary_key_partial_update_broker_load", "p0,external,hive,external_docker,external_docker_hive") { + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + brokerName = getBrokerName() + hdfsUser = getHdfsUser() + hdfsPasswd = getHdfsPasswd() + hdfs_port = context.config.otherConfigs.get("hdfs_port") + externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def load_from_hdfs = {testTable, label, hdfsFilePath, format, brokerName, hdfsUser, hdfsPasswd -> + def result1= sql """ + LOAD LABEL ${label} ( + DATA INFILE("${hdfsFilePath}") + INTO TABLE ${testTable} + COLUMNS TERMINATED BY "," + (id, score) + ) + with HDFS ( + "fs.defaultFS"="hdfs://${externalEnvIp}:${hdfs_port}", + "hadoop.username"="${hdfsUser}") + PROPERTIES ( + "timeout"="1200", + "max_filter_ratio"="0", + "partial_columns"="true"); + """ + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected") + } + + def wait_for_load_result = {checklabel, testTable -> + max_try_milli_secs = 10000 + while(max_try_milli_secs) { + result = sql "show load where label = '${checklabel}'" + if(result[0][2] == "FINISHED") { + break + } else { + sleep(1000) // wait 1 second every time + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertEquals(1, 2) + } + } + } + } + + def tableName = "test_primary_key_partial_update_broker_load" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) NULL COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321") + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true") + """ + sql """insert into ${tableName} values(2, "bob", 2000, 223, 2),(1, "alice", 1000, 123, 1),(3, "tom", 3000, 323, 3);""" + qt_sql """ select * from ${tableName} order by id; """ + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs(tableName, test_load_label, "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/data_case/partial_update/update.csv", "csv", brokerName, hdfsUser, hdfsPasswd) + wait_for_load_result(test_load_label, tableName) + qt_sql """select * from ${tableName} order by id;""" + + sql "set enable_unified_load=true;" + sql "sync;" + def test_load_label2 = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs(tableName, test_load_label2, "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/data_case/partial_update/update2.csv", "csv", brokerName, hdfsUser, hdfsPasswd) + wait_for_load_result(test_load_label2, tableName) + qt_sql """select * from ${tableName} order by id;""" + sql "drop table if exists ${tableName};" + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org