This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 805a33f7686a459acf16a5503e83d4ac517b6276 Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Wed Aug 28 18:09:52 2024 +0800 [Fix](partial update) Fix wrongly update autoinc column in partial update (#39996) ## Proposed changes https://github.com/apache/doris/pull/38229 convert partial update to upsert in some situations. But when the insert stmt has missing autoinc value column, we can't do this transformation, otherwise the value in autoinc column will be wrongly overwritten with newly generated values instead of reading from old rows. --- .../apache/doris/analysis/NativeInsertStmt.java | 12 +- .../trees/plans/commands/insert/InsertUtils.java | 8 +- .../apache/doris/planner/StreamLoadPlanner.java | 23 +++- .../partial_update/partial_update_autoinc1.csv | 2 + .../partial_update/partial_update_autoinc2.csv | 2 + .../partial_update/partial_update_autoinc3.csv | 2 + .../partial_update/partial_update_autoinc4.csv | 2 + .../test_partial_update_auto_inc.out | 70 ++++++------ .../test_delete_from_timeout.groovy | 2 +- .../test_partial_update_auto_inc.groovy | 121 +++++++++++++++------ 10 files changed, 165 insertions(+), 79 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 64ab872ea8b..db08e699793 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -1364,7 +1364,7 @@ public class NativeInsertStmt extends InsertStmt { if (hasEmptyTargetColumns) { return; } - boolean hasMissingColExceptAutoInc = false; + boolean hasMissingColExceptAutoIncKey = false; for (Column col : olapTable.getFullSchema()) { boolean exists = false; for (Column insertCol : targetColumns) { @@ -1377,16 +1377,16 @@ public class NativeInsertStmt extends InsertStmt { break; } } - if (!exists && !col.isAutoInc()) { - if (col.isKey()) { + if (!exists) { + if (col.isKey() && !col.isAutoInc()) { throw new UserException("Partial update should include all key columns, missing: " + col.getName()); } - if (col.isVisible()) { - hasMissingColExceptAutoInc = true; + if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) { + hasMissingColExceptAutoIncKey = true; } } } - if (!hasMissingColExceptAutoInc) { + if (!hasMissingColExceptAutoIncKey) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 6c4763afcfb..a14539d148d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -282,7 +282,7 @@ public class InsertUtils { if (unboundLogicalSink.getColNames().isEmpty()) { ((UnboundTableSink<? extends Plan>) unboundLogicalSink).setPartialUpdate(false); } else { - boolean hasMissingColExceptAutoInc = false; + boolean hasMissingColExceptAutoIncKey = false; for (Column col : olapTable.getFullSchema()) { Optional<String> insertCol = unboundLogicalSink.getColNames().stream() .filter(c -> c.equalsIgnoreCase(col.getName())).findFirst(); @@ -296,11 +296,11 @@ public class InsertUtils { + " all ordinary columns referenced" + " by generated columns, missing: " + col.getName()); } - if (!col.isAutoInc() && !insertCol.isPresent() && col.isVisible()) { - hasMissingColExceptAutoInc = true; + if (!(col.isAutoInc() && col.isKey()) && !insertCol.isPresent() && col.isVisible()) { + hasMissingColExceptAutoIncKey = true; } } - if (!hasMissingColExceptAutoInc) { + if (!hasMissingColExceptAutoIncKey) { ((UnboundTableSink<? extends Plan>) unboundLogicalSink).setPartialUpdate(false); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 1a74d016cef..8988d3220c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -147,6 +147,13 @@ public class StreamLoadPlanner { if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) { throw new UserException("Only unique key merge on write support partial update"); } + + // try to convert to upsert if only has missing auto-increment key column + boolean hasMissingColExceptAutoIncKey = false; + if (taskInfo.getColumnExprDescs().descs.isEmpty()) { + isPartialUpdate = false; + } + HashSet<String> partialUpdateInputColumns = new HashSet<>(); if (isPartialUpdate) { for (Column col : destTable.getFullSchema()) { @@ -171,9 +178,16 @@ public class StreamLoadPlanner { break; } } - if (col.isKey() && !existInExpr) { - throw new UserException("Partial update should include all key columns, missing: " + col.getName()); + if (!existInExpr) { + if (col.isKey() && !col.isAutoInc()) { + throw new UserException("Partial update should include all key columns, missing: " + + col.getName()); + } + if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) { + hasMissingColExceptAutoIncKey = true; + } } + if (!col.getGeneratedColumnsThatReferToThis().isEmpty() && col.getGeneratedColumnInfo() == null && !existInExpr) { throw new UserException("Partial update should include" @@ -185,6 +199,9 @@ public class StreamLoadPlanner { partialUpdateInputColumns.add(Column.DELETE_SIGN); } } + if (isPartialUpdate && !hasMissingColExceptAutoIncKey) { + isPartialUpdate = false; + } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) { @@ -252,7 +269,7 @@ public class StreamLoadPlanner { // The load id will pass to csv reader to find the stream load context from new load stream manager fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(), fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(), - taskInfo.isPartialUpdate()); + isPartialUpdate); scanNode = fileScanNode; scanNode.init(analyzer); diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv new file mode 100644 index 00000000000..63b02b8a306 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv @@ -0,0 +1,2 @@ +doris3 +doris4 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv new file mode 100644 index 00000000000..737c9a056f0 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv @@ -0,0 +1,2 @@ +102,doris8 +103,doris9 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv new file mode 100644 index 00000000000..c06e9e00d07 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv @@ -0,0 +1,2 @@ +104,"doris10" +105,"doris11" \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv new file mode 100644 index 00000000000..3a227dba5f5 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv @@ -0,0 +1,2 @@ +2,888,888 +3,888,888 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out index 380575499e2..d157f501a8b 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out @@ -2,64 +2,72 @@ -- !select_1 -- doris1 doris2 - --- !select_2 -- -2 - --- !select_3 -- -doris1 -doris2 doris3 doris4 --- !select_4 -- -4 - --- !select_1 -- -doris1 -doris2 - -- !select_2 -- -2 +4 -- !select_3 -- +"doris10" +"doris11" doris1 doris2 doris3 doris4 +doris5 +doris7 +doris8 +doris9 -- !select_4 -- -4 +10 --- !select_1 -- -doris1 -doris2 +-- !select_5 -- +1 10 10 10 +2 20 20 20 +3 30 30 30 +4 40 40 40 --- !select_2 -- -2 +-- !select_6 -- +1 99 99 10 +2 888 888 20 +3 888 888 30 +4 40 40 40 --- !select_3 -- +-- !select_1 -- doris1 doris2 doris3 doris4 --- !select_4 -- -4 - --- !select_1 -- -doris1 -doris2 - -- !select_2 -- -2 +4 -- !select_3 -- +"doris10" +"doris11" doris1 doris2 doris3 doris4 +doris5 +doris7 +doris8 +doris9 -- !select_4 -- -4 +10 + +-- !select_5 -- +1 10 10 10 +2 20 20 20 +3 30 30 30 +4 40 40 40 + +-- !select_6 -- +1 99 99 10 +2 888 888 20 +3 888 888 30 +4 40 40 40 diff --git a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy index 7d1efdc9782..8598d791e01 100644 --- a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy +++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy @@ -53,7 +53,7 @@ suite("test_delete_from_timeout","nonConcurrent") { GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail") } - sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """ + sql """delete from ${tableName} where col1 = "false" and col3 = "-25"; """ t1.join() qt_sql "select * from ${tableName} order by col1, col2, col3;" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy index 88bdb8ae2e0..ec46939b2f5 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -20,46 +19,100 @@ suite("test_partial_update_auto_inc") { String db = context.config.getDbNameByFile(context.file) sql "select 1;" // to create database - for (def use_mow : [false, true]) { - for (def use_nereids_planner : [false, true]) { - logger.info("current params: use_mow: ${use_mow}, use_nereids_planner: ${use_nereids_planner}") - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql "use ${db};" + for (def use_nereids_planner : [false, true]) { + logger.info("current params: use_nereids_planner: ${use_nereids_planner}") + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql "use ${db};" - if (use_nereids_planner) { - sql """ set enable_nereids_planner=true; """ - sql """ set enable_fallback_to_original_planner=false; """ - } else { - sql """ set enable_nereids_planner = false; """ - } + if (use_nereids_planner) { + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_planner = false; """ + } - // create table - sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """ - sql """ CREATE TABLE test_primary_key_partial_update_auto_inc ( - `id` BIGINT NOT NULL AUTO_INCREMENT, - `name` varchar(65533) NOT NULL COMMENT "用户姓名" ) - UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "${use_mow}"); """ + sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """ + sql """ CREATE TABLE test_primary_key_partial_update_auto_inc ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `name` varchar(65533) NOT NULL COMMENT "用户姓名" ) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """ - sql """ set enable_unique_key_partial_update=true; """ - sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris1"); """ - sql """ set enable_unique_key_partial_update=false; """ - sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris2"); """ - sql "sync" + sql """ set enable_unique_key_partial_update=true; """ + sql "sync" + // insert stmt only misses auto-inc key column + sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris1"); """ + sql """ set enable_unique_key_partial_update=false; """ + sql "sync" + sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris2"); """ + // stream load only misses auto-inc key column + streamLoad { + table "test_primary_key_partial_update_auto_inc" + set 'partial_columns', 'true' + set 'column_separator', ',' + set 'columns', 'name' + file 'partial_update_autoinc1.csv' + time 10000 + } + qt_select_1 """ select name from test_primary_key_partial_update_auto_inc order by name; """ + qt_select_2 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """ - qt_select_1 """ select name from test_primary_key_partial_update_auto_inc order by name; """ - qt_select_2 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """ + sql """ set enable_unique_key_partial_update=true; """ + sql "sync" + // insert stmt withou column list + sql """ insert into test_primary_key_partial_update_auto_inc values(100,"doris5"); """ + // insert stmt, column list include all visible columns + sql """ insert into test_primary_key_partial_update_auto_inc(id,name) values(102,"doris6"); """ + sql """ set enable_unique_key_partial_update=false; """ + sql "sync" + sql """ insert into test_primary_key_partial_update_auto_inc values(101, "doris7"); """ + // stream load withou column list + streamLoad { + table "test_primary_key_partial_update_auto_inc" + set 'partial_columns', 'true' + set 'column_separator', ',' + file 'partial_update_autoinc2.csv' + time 10000 + } + // stream load, column list include all visible columns + streamLoad { + table "test_primary_key_partial_update_auto_inc" + set 'partial_columns', 'true' + set 'column_separator', ',' + set 'columns', 'id,name' + file 'partial_update_autoinc3.csv' + time 10000 + } + qt_select_3 """ select name from test_primary_key_partial_update_auto_inc order by name; """ + qt_select_4 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """ + sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """ - sql """ set enable_unique_key_partial_update=true; """ - sql """ insert into test_primary_key_partial_update_auto_inc values(100,"doris3"); """ - sql """ set enable_unique_key_partial_update=false; """ - sql """ insert into test_primary_key_partial_update_auto_inc values(101, "doris4"); """ - sql "sync" - qt_select_3 """ select name from test_primary_key_partial_update_auto_inc order by name; """ - qt_select_4 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """ - sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """ + sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc2 """ + sql """ CREATE TABLE test_primary_key_partial_update_auto_inc2 ( + `id` BIGINT NOT NULL, + `c1` int, + `c2` int, + `cid` BIGINT NOT NULL AUTO_INCREMENT) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """ + sql "insert into test_primary_key_partial_update_auto_inc2 values(1,10,10,10),(2,20,20,20),(3,30,30,30),(4,40,40,40);" + order_qt_select_5 "select * from test_primary_key_partial_update_auto_inc2" + sql """ set enable_unique_key_partial_update=true; """ + sql "sync;" + // insert stmt only misses auto-inc value column, its value should not change when do partial update + sql "insert into test_primary_key_partial_update_auto_inc2(id,c1,c2) values(1,99,99),(2,99,99);" + // stream load only misses auto-inc value column, its value should not change when do partial update + streamLoad { + table "test_primary_key_partial_update_auto_inc2" + set 'partial_columns', 'true' + set 'column_separator', ',' + set 'columns', 'id,c1,c2' + file 'partial_update_autoinc4.csv' + time 10000 } + order_qt_select_6 "select * from test_primary_key_partial_update_auto_inc2" + sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc2 """ } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org