This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 805a33f7686a459acf16a5503e83d4ac517b6276
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Wed Aug 28 18:09:52 2024 +0800

    [Fix](partial update) Fix wrongly update autoinc column in partial update 
(#39996)
    
    ## Proposed changes
    
    https://github.com/apache/doris/pull/38229 convert partial update to
    upsert in some situations. But when the insert stmt has missing autoinc
    value column, we can't do this transformation, otherwise the value in
    autoinc column will be wrongly overwritten with newly generated values
    instead of reading from old rows.
---
 .../apache/doris/analysis/NativeInsertStmt.java    |  12 +-
 .../trees/plans/commands/insert/InsertUtils.java   |   8 +-
 .../apache/doris/planner/StreamLoadPlanner.java    |  23 +++-
 .../partial_update/partial_update_autoinc1.csv     |   2 +
 .../partial_update/partial_update_autoinc2.csv     |   2 +
 .../partial_update/partial_update_autoinc3.csv     |   2 +
 .../partial_update/partial_update_autoinc4.csv     |   2 +
 .../test_partial_update_auto_inc.out               |  70 ++++++------
 .../test_delete_from_timeout.groovy                |   2 +-
 .../test_partial_update_auto_inc.groovy            | 121 +++++++++++++++------
 10 files changed, 165 insertions(+), 79 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 64ab872ea8b..db08e699793 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1364,7 +1364,7 @@ public class NativeInsertStmt extends InsertStmt {
         if (hasEmptyTargetColumns) {
             return;
         }
-        boolean hasMissingColExceptAutoInc = false;
+        boolean hasMissingColExceptAutoIncKey = false;
         for (Column col : olapTable.getFullSchema()) {
             boolean exists = false;
             for (Column insertCol : targetColumns) {
@@ -1377,16 +1377,16 @@ public class NativeInsertStmt extends InsertStmt {
                     break;
                 }
             }
-            if (!exists && !col.isAutoInc()) {
-                if (col.isKey()) {
+            if (!exists) {
+                if (col.isKey() && !col.isAutoInc()) {
                     throw new UserException("Partial update should include all 
key columns, missing: " + col.getName());
                 }
-                if (col.isVisible()) {
-                    hasMissingColExceptAutoInc = true;
+                if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
+                    hasMissingColExceptAutoIncKey = true;
                 }
             }
         }
-        if (!hasMissingColExceptAutoInc) {
+        if (!hasMissingColExceptAutoIncKey) {
             return;
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index 6c4763afcfb..a14539d148d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -282,7 +282,7 @@ public class InsertUtils {
                         if (unboundLogicalSink.getColNames().isEmpty()) {
                             ((UnboundTableSink<? extends Plan>) 
unboundLogicalSink).setPartialUpdate(false);
                         } else {
-                            boolean hasMissingColExceptAutoInc = false;
+                            boolean hasMissingColExceptAutoIncKey = false;
                             for (Column col : olapTable.getFullSchema()) {
                                 Optional<String> insertCol = 
unboundLogicalSink.getColNames().stream()
                                         .filter(c -> 
c.equalsIgnoreCase(col.getName())).findFirst();
@@ -296,11 +296,11 @@ public class InsertUtils {
                                             + " all ordinary columns 
referenced"
                                             + " by generated columns, missing: 
" + col.getName());
                                 }
-                                if (!col.isAutoInc() && !insertCol.isPresent() 
&& col.isVisible()) {
-                                    hasMissingColExceptAutoInc = true;
+                                if (!(col.isAutoInc() && col.isKey()) && 
!insertCol.isPresent() && col.isVisible()) {
+                                    hasMissingColExceptAutoIncKey = true;
                                 }
                             }
-                            if (!hasMissingColExceptAutoInc) {
+                            if (!hasMissingColExceptAutoIncKey) {
                                 ((UnboundTableSink<? extends Plan>) 
unboundLogicalSink).setPartialUpdate(false);
                             }
                         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 1a74d016cef..8988d3220c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -147,6 +147,13 @@ public class StreamLoadPlanner {
         if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
             throw new UserException("Only unique key merge on write support 
partial update");
         }
+
+        // try to convert to upsert if only has missing auto-increment key 
column
+        boolean hasMissingColExceptAutoIncKey = false;
+        if (taskInfo.getColumnExprDescs().descs.isEmpty()) {
+            isPartialUpdate = false;
+        }
+
         HashSet<String> partialUpdateInputColumns = new HashSet<>();
         if (isPartialUpdate) {
             for (Column col : destTable.getFullSchema()) {
@@ -171,9 +178,16 @@ public class StreamLoadPlanner {
                         break;
                     }
                 }
-                if (col.isKey() && !existInExpr) {
-                    throw new UserException("Partial update should include all 
key columns, missing: " + col.getName());
+                if (!existInExpr) {
+                    if (col.isKey() && !col.isAutoInc()) {
+                        throw new UserException("Partial update should include 
all key columns, missing: "
+                                + col.getName());
+                    }
+                    if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
+                        hasMissingColExceptAutoIncKey = true;
+                    }
                 }
+
                 if (!col.getGeneratedColumnsThatReferToThis().isEmpty()
                         && col.getGeneratedColumnInfo() == null && 
!existInExpr) {
                     throw new UserException("Partial update should include"
@@ -185,6 +199,9 @@ public class StreamLoadPlanner {
                 partialUpdateInputColumns.add(Column.DELETE_SIGN);
             }
         }
+        if (isPartialUpdate && !hasMissingColExceptAutoIncKey) {
+            isPartialUpdate = false;
+        }
         // here we should be full schema to fill the descriptor table
         for (Column col : destTable.getFullSchema()) {
             if (isPartialUpdate && 
!partialUpdateInputColumns.contains(col.getName())) {
@@ -252,7 +269,7 @@ public class StreamLoadPlanner {
         // The load id will pass to csv reader to find the stream load context 
from new load stream manager
         fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, 
BrokerDesc.createForStreamLoad(),
                 fileGroup, fileStatus, taskInfo.isStrictMode(), 
taskInfo.getFileType(), taskInfo.getHiddenColumns(),
-                taskInfo.isPartialUpdate());
+                isPartialUpdate);
         scanNode = fileScanNode;
 
         scanNode.init(analyzer);
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
new file mode 100644
index 00000000000..63b02b8a306
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv
@@ -0,0 +1,2 @@
+doris3
+doris4
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
new file mode 100644
index 00000000000..737c9a056f0
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv
@@ -0,0 +1,2 @@
+102,doris8
+103,doris9
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
new file mode 100644
index 00000000000..c06e9e00d07
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv
@@ -0,0 +1,2 @@
+104,"doris10"
+105,"doris11"
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
new file mode 100644
index 00000000000..3a227dba5f5
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv
@@ -0,0 +1,2 @@
+2,888,888
+3,888,888
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
index 380575499e2..d157f501a8b 100644
--- 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out
@@ -2,64 +2,72 @@
 -- !select_1 --
 doris1
 doris2
-
--- !select_2 --
-2
-
--- !select_3 --
-doris1
-doris2
 doris3
 doris4
 
--- !select_4 --
-4
-
--- !select_1 --
-doris1
-doris2
-
 -- !select_2 --
-2
+4
 
 -- !select_3 --
+"doris10"
+"doris11"
 doris1
 doris2
 doris3
 doris4
+doris5
+doris7
+doris8
+doris9
 
 -- !select_4 --
-4
+10
 
--- !select_1 --
-doris1
-doris2
+-- !select_5 --
+1      10      10      10
+2      20      20      20
+3      30      30      30
+4      40      40      40
 
--- !select_2 --
-2
+-- !select_6 --
+1      99      99      10
+2      888     888     20
+3      888     888     30
+4      40      40      40
 
--- !select_3 --
+-- !select_1 --
 doris1
 doris2
 doris3
 doris4
 
--- !select_4 --
-4
-
--- !select_1 --
-doris1
-doris2
-
 -- !select_2 --
-2
+4
 
 -- !select_3 --
+"doris10"
+"doris11"
 doris1
 doris2
 doris3
 doris4
+doris5
+doris7
+doris8
+doris9
 
 -- !select_4 --
-4
+10
+
+-- !select_5 --
+1      10      10      10
+2      20      20      20
+3      30      30      30
+4      40      40      40
+
+-- !select_6 --
+1      99      99      10
+2      888     888     20
+3      888     888     30
+4      40      40      40
 
diff --git 
a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy 
b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
index 7d1efdc9782..8598d791e01 100644
--- a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
+++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
@@ -53,7 +53,7 @@ suite("test_delete_from_timeout","nonConcurrent") {
             
GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
         }
 
-        sql """delete from ${tableName} where col1 = "false" and col2 = 
"-9999782574499444.2" and col3 = "-25"; """
+        sql """delete from ${tableName} where col1 = "false" and col3 = "-25"; 
"""
         t1.join()
         qt_sql "select * from ${tableName} order by col1, col2, col3;"
 
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
index 88bdb8ae2e0..ec46939b2f5 100644
--- 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy
@@ -1,4 +1,3 @@
-
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -20,46 +19,100 @@ suite("test_partial_update_auto_inc") {
     String db = context.config.getDbNameByFile(context.file)
     sql "select 1;" // to create database
 
-    for (def use_mow : [false, true]) {
-        for (def use_nereids_planner : [false, true]) {
-            logger.info("current params: use_mow: ${use_mow}, 
use_nereids_planner: ${use_nereids_planner}")
-            connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
-                sql "use ${db};"
+    for (def use_nereids_planner : [false, true]) {
+        logger.info("current params: use_nereids_planner: 
${use_nereids_planner}")
+        connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl) {
+            sql "use ${db};"
 
-                if (use_nereids_planner) {
-                    sql """ set enable_nereids_planner=true; """
-                    sql """ set enable_fallback_to_original_planner=false; """
-                } else {
-                    sql """ set enable_nereids_planner = false; """
-                }
+            if (use_nereids_planner) {
+                sql """ set enable_nereids_planner=true; """
+                sql """ set enable_fallback_to_original_planner=false; """
+            } else {
+                sql """ set enable_nereids_planner = false; """
+            }
 
-                // create table
-                sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc """
-                sql """ CREATE TABLE test_primary_key_partial_update_auto_inc (
-                            `id` BIGINT NOT NULL AUTO_INCREMENT,
-                            `name` varchar(65533) NOT NULL COMMENT "用户姓名" )
-                            UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 
1
-                            PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "${use_mow}"); """
+            sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc """
+            sql """ CREATE TABLE test_primary_key_partial_update_auto_inc (
+                        `id` BIGINT NOT NULL AUTO_INCREMENT,
+                        `name` varchar(65533) NOT NULL COMMENT "用户姓名" )
+                        UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                        PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true"); """
 
-                sql """ set enable_unique_key_partial_update=true; """
-                sql """ insert into 
test_primary_key_partial_update_auto_inc(name) values("doris1"); """
-                sql """ set enable_unique_key_partial_update=false; """
-                sql """ insert into 
test_primary_key_partial_update_auto_inc(name) values("doris2"); """
-                sql "sync"
+            sql """ set enable_unique_key_partial_update=true; """
+            sql "sync"
+            // insert stmt only misses auto-inc key column
+            sql """ insert into test_primary_key_partial_update_auto_inc(name) 
values("doris1"); """
+            sql """ set enable_unique_key_partial_update=false; """
+            sql "sync"
+            sql """ insert into test_primary_key_partial_update_auto_inc(name) 
values("doris2"); """
+            // stream load only misses auto-inc key column
+            streamLoad {
+                table "test_primary_key_partial_update_auto_inc"
+                set 'partial_columns', 'true'
+                set 'column_separator', ','
+                set 'columns', 'name'
+                file 'partial_update_autoinc1.csv'
+                time 10000
+            }
+            qt_select_1 """ select name from 
test_primary_key_partial_update_auto_inc order by name; """
+            qt_select_2 """ select count(distinct id) from 
test_primary_key_partial_update_auto_inc; """
 
-                qt_select_1 """ select name from 
test_primary_key_partial_update_auto_inc order by name; """
-                qt_select_2 """ select count(distinct id) from 
test_primary_key_partial_update_auto_inc; """
+            sql """ set enable_unique_key_partial_update=true; """
+            sql "sync"
+            // insert stmt withou column list
+            sql """ insert into test_primary_key_partial_update_auto_inc 
values(100,"doris5"); """
+            // insert stmt, column list include all visible columns
+            sql """ insert into 
test_primary_key_partial_update_auto_inc(id,name) values(102,"doris6"); """
+            sql """ set enable_unique_key_partial_update=false; """
+            sql "sync"
+            sql """ insert into test_primary_key_partial_update_auto_inc 
values(101, "doris7"); """
+            // stream load withou column list
+            streamLoad {
+                table "test_primary_key_partial_update_auto_inc"
+                set 'partial_columns', 'true'
+                set 'column_separator', ','
+                file 'partial_update_autoinc2.csv'
+                time 10000
+            }
+            // stream load, column list include all visible columns
+            streamLoad {
+                table "test_primary_key_partial_update_auto_inc"
+                set 'partial_columns', 'true'
+                set 'column_separator', ','
+                set 'columns', 'id,name'
+                file 'partial_update_autoinc3.csv'
+                time 10000
+            }
+            qt_select_3 """ select name from 
test_primary_key_partial_update_auto_inc order by name; """
+            qt_select_4 """ select count(distinct id) from 
test_primary_key_partial_update_auto_inc; """
+            sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc """
 
-                sql """ set enable_unique_key_partial_update=true; """
-                sql """ insert into test_primary_key_partial_update_auto_inc 
values(100,"doris3"); """
-                sql """ set enable_unique_key_partial_update=false; """
-                sql """ insert into test_primary_key_partial_update_auto_inc 
values(101, "doris4"); """
-                sql "sync"
-                qt_select_3 """ select name from 
test_primary_key_partial_update_auto_inc order by name; """
-                qt_select_4 """ select count(distinct id) from 
test_primary_key_partial_update_auto_inc; """
 
-                sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc """
+            sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc2 """
+            sql """ CREATE TABLE test_primary_key_partial_update_auto_inc2 (
+                        `id` BIGINT NOT NULL,
+                        `c1` int,
+                        `c2` int,
+                        `cid` BIGINT NOT NULL AUTO_INCREMENT)
+                        UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
+                        PROPERTIES("replication_num" = "1", 
"enable_unique_key_merge_on_write" = "true"); """
+            sql "insert into test_primary_key_partial_update_auto_inc2 
values(1,10,10,10),(2,20,20,20),(3,30,30,30),(4,40,40,40);"
+            order_qt_select_5 "select * from 
test_primary_key_partial_update_auto_inc2"
+            sql """ set enable_unique_key_partial_update=true; """
+            sql "sync;"
+            // insert stmt only misses auto-inc value column, its value should 
not change when do partial update
+            sql "insert into 
test_primary_key_partial_update_auto_inc2(id,c1,c2) values(1,99,99),(2,99,99);"
+            // stream load only misses auto-inc value column, its value should 
not change when do partial update
+            streamLoad {
+                table "test_primary_key_partial_update_auto_inc2"
+                set 'partial_columns', 'true'
+                set 'column_separator', ','
+                set 'columns', 'id,c1,c2'
+                file 'partial_update_autoinc4.csv'
+                time 10000
             }
+            order_qt_select_6 "select * from 
test_primary_key_partial_update_auto_inc2"
+            sql """ DROP TABLE IF EXISTS 
test_primary_key_partial_update_auto_inc2 """
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to