This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f925b6ea70541311faa35dcb5c96e92324debcff
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Wed Aug 28 23:14:26 2024 +0800

    [Enhancement](txn) Block new insert into if schema change happens during 
transaction (#39483)
---
 be/src/olap/schema_change.cpp                      |   1 +
 .../insert/BatchInsertIntoTableCommand.java        |  11 ++
 .../trees/plans/physical/PhysicalResultSink.java   |   4 -
 .../java/org/apache/doris/qe/StmtExecutor.java     |   1 +
 .../apache/doris/transaction/TransactionEntry.java |  18 +++
 .../data/insert_p0/{ => transaction}/test_txn.out  |   0
 .../insert_p0/{ => transaction}/txn_insert.out     |   0
 .../{ => transaction}/txn_insert_inject_case.out   |   0
 .../{ => transaction}/txn_insert_restart_fe.out    |   0
 .../txn_insert_restart_fe_with_schema_change.out   |   0
 .../txn_insert_values_with_schema_change.out       |   0
 .../{ => transaction}/txn_insert_with_drop.out     |   0
 .../txn_insert_with_schema_change.out              |   0
 .../txn_insert_with_specify_columns.out            |   0
 ...pecify_columns_schema_change_add_key_column.out |  15 +++
 ...cify_columns_schema_change_add_value_column.out |  15 +++
 ...pecify_columns_schema_change_reorder_column.out |  19 +++
 .../insert_p0/{ => transaction}/test_txn.groovy    |   0
 .../insert_p0/{ => transaction}/txn_insert.groovy  |   6 +-
 .../txn_insert_concurrent_insert.groovy            |   4 +-
 .../txn_insert_inject_case.groovy                  |   2 +-
 .../{ => transaction}/txn_insert_restart_fe.groovy |   0
 ...txn_insert_restart_fe_with_schema_change.groovy |   0
 .../txn_insert_values_with_schema_change.groovy    |   2 +-
 .../{ => transaction}/txn_insert_with_drop.groovy  |   0
 .../txn_insert_with_schema_change.groovy           |   2 +-
 .../txn_insert_with_specify_columns.groovy         |   0
 ...ify_columns_schema_change_add_key_column.groovy | 127 ++++++++++++++++++++
 ...y_columns_schema_change_add_value_column.groovy | 129 +++++++++++++++++++++
 ...ify_columns_schema_change_reorder_column.groovy | 129 +++++++++++++++++++++
 30 files changed, 473 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 04409e37d84..4418d02ecdf 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -777,6 +777,7 @@ SchemaChangeJob::SchemaChangeJob(StorageEngine& 
local_storage_engine,
 // The admin should upgrade all BE and then upgrade FE.
 // Should delete the old code after upgrade finished.
 Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& 
request) {
+    DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { 
sleep(10); })
     Status res;
     signal::tablet_id = _base_tablet->get_table_id();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
index a1b834fe904..04d43e8cc33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
 import org.apache.doris.analysis.StmtType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.ErrorCode;
@@ -120,6 +121,16 @@ public class BatchInsertIntoTableCommand extends Command 
implements NoForward, E
             Preconditions.checkArgument(plan.isPresent(), "insert into command 
must contain OlapTableSinkNode");
             sink = ((PhysicalOlapTableSink<?>) plan.get());
             Table targetTable = sink.getTargetTable();
+            if (ctx.getTxnEntry().isFirstTxnInsert()) {
+                ctx.getTxnEntry().setTxnSchemaVersion(((OlapTable) 
targetTable).getBaseSchemaVersion());
+                ctx.getTxnEntry().setFirstTxnInsert(false);
+            } else {
+                if (((OlapTable) targetTable).getBaseSchemaVersion() != 
ctx.getTxnEntry().getTxnSchemaVersion()) {
+                    throw new AnalysisException("There are schema changes in 
one transaction, "
+                            + "you can commit this transaction with formal 
data or rollback "
+                            + "this whole transaction.");
+                }
+            }
             // should set columns of sink since we maybe generate some 
invisible columns
             List<Column> fullSchema = sink.getTargetTable().getFullSchema();
             List<Column> targetSchema = Lists.newArrayList();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
index aceb1f13774..8fb6dfb286e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
@@ -58,10 +58,6 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> 
extends PhysicalSink<CH
                 logicalProperties, physicalProperties, statistics, child);
     }
 
-    public List<NamedExpression> getOutputExprs() {
-        return outputExprs;
-    }
-
     @Override
     public PhysicalResultSink<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 64616420474..86e5603d94c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2044,6 +2044,7 @@ public class StmtExecutor {
                     .setTxnConf(new 
TTxnParams().setNeedTxn(true).setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("")
                             
.setTbl("").setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict()
 ? 0
                                     : 
context.getSessionVariable().getInsertMaxFilterRatio()));
+            context.getTxnEntry().setFirstTxnInsert(true);
             StringBuilder sb = new StringBuilder();
             
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 
'status':'")
                     .append(TransactionStatus.PREPARE.name());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 6771d9c3156..4a2beea2d0b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -80,6 +80,8 @@ public class TransactionEntry {
     private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
     private long rowsInTransaction = 0;
     private Types.PUniqueId pLoadId;
+    private boolean isFirstTxnInsert = false;
+    private volatile int txnSchemaVersion = -1;
 
     // for insert into select for multi tables
     private boolean isTransactionBegan = false;
@@ -181,6 +183,22 @@ public class TransactionEntry {
         this.pLoadId = pLoadId;
     }
 
+    public boolean isFirstTxnInsert() {
+        return isFirstTxnInsert;
+    }
+
+    public void setFirstTxnInsert(boolean firstTxnInsert) {
+        isFirstTxnInsert = firstTxnInsert;
+    }
+
+    public int getTxnSchemaVersion() {
+        return txnSchemaVersion;
+    }
+
+    public void setTxnSchemaVersion(int txnSchemaVersion) {
+        this.txnSchemaVersion = txnSchemaVersion;
+    }
+
     // Used for insert into select, return the sub_txn_id for this insert
     public long beginTransaction(TableIf table, SubTransactionType 
subTransactionType) throws Exception {
         if (isInsertValuesTxnBegan()) {
diff --git a/regression-test/data/insert_p0/test_txn.out 
b/regression-test/data/insert_p0/transaction/test_txn.out
similarity index 100%
rename from regression-test/data/insert_p0/test_txn.out
rename to regression-test/data/insert_p0/transaction/test_txn.out
diff --git a/regression-test/data/insert_p0/txn_insert.out 
b/regression-test/data/insert_p0/transaction/txn_insert.out
similarity index 100%
rename from regression-test/data/insert_p0/txn_insert.out
rename to regression-test/data/insert_p0/transaction/txn_insert.out
diff --git a/regression-test/data/insert_p0/txn_insert_inject_case.out 
b/regression-test/data/insert_p0/transaction/txn_insert_inject_case.out
similarity index 100%
rename from regression-test/data/insert_p0/txn_insert_inject_case.out
rename to regression-test/data/insert_p0/transaction/txn_insert_inject_case.out
diff --git a/regression-test/data/insert_p0/txn_insert_restart_fe.out 
b/regression-test/data/insert_p0/transaction/txn_insert_restart_fe.out
similarity index 100%
rename from regression-test/data/insert_p0/txn_insert_restart_fe.out
rename to regression-test/data/insert_p0/transaction/txn_insert_restart_fe.out
diff --git 
a/regression-test/data/insert_p0/txn_insert_restart_fe_with_schema_change.out 
b/regression-test/data/insert_p0/transaction/txn_insert_restart_fe_with_schema_change.out
similarity index 100%
rename from 
regression-test/data/insert_p0/txn_insert_restart_fe_with_schema_change.out
rename to 
regression-test/data/insert_p0/transaction/txn_insert_restart_fe_with_schema_change.out
diff --git 
a/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out 
b/regression-test/data/insert_p0/transaction/txn_insert_values_with_schema_change.out
similarity index 100%
rename from 
regression-test/data/insert_p0/txn_insert_values_with_schema_change.out
rename to 
regression-test/data/insert_p0/transaction/txn_insert_values_with_schema_change.out
diff --git a/regression-test/data/insert_p0/txn_insert_with_drop.out 
b/regression-test/data/insert_p0/transaction/txn_insert_with_drop.out
similarity index 100%
rename from regression-test/data/insert_p0/txn_insert_with_drop.out
rename to regression-test/data/insert_p0/transaction/txn_insert_with_drop.out
diff --git a/regression-test/data/insert_p0/txn_insert_with_schema_change.out 
b/regression-test/data/insert_p0/transaction/txn_insert_with_schema_change.out
similarity index 100%
rename from regression-test/data/insert_p0/txn_insert_with_schema_change.out
rename to 
regression-test/data/insert_p0/transaction/txn_insert_with_schema_change.out
diff --git a/regression-test/data/insert_p0/txn_insert_with_specify_columns.out 
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns.out
similarity index 100%
rename from regression-test/data/insert_p0/txn_insert_with_specify_columns.out
rename to 
regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns.out
diff --git 
a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out
 
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out
new file mode 100644
index 00000000000..b06ce07b4a8
--- /dev/null
+++ 
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_desc1 --
+c1     int     Yes     true    \N      
+c2     int     Yes     false   \N      NONE
+c3     int     Yes     false   \N      NONE
+
+-- !select_desc2 --
+c1     int     Yes     true    \N      
+new_col        int     Yes     true    \N      
+c2     int     Yes     false   \N      NONE
+c3     int     Yes     false   \N      NONE
+
+-- !select1 --
+1      \N      2       3
+
diff --git 
a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out
 
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out
new file mode 100644
index 00000000000..560051c8800
--- /dev/null
+++ 
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_desc1 --
+c1     int     Yes     true    \N      
+c2     int     Yes     false   \N      NONE
+c3     int     Yes     false   \N      NONE
+
+-- !select_desc2 --
+c1     int     Yes     true    \N      
+c2     int     Yes     false   \N      NONE
+c3     int     Yes     false   \N      NONE
+new_col        int     Yes     false   \N      NONE
+
+-- !select1 --
+1      2       3       \N
+
diff --git 
a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out
 
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out
new file mode 100644
index 00000000000..2cac2aa11bc
--- /dev/null
+++ 
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_desc1 --
+c1     int     Yes     true    \N      
+c2     bigint  Yes     false   \N      NONE
+c3     int     Yes     false   \N      NONE
+
+-- !select_desc2 --
+c1     int     Yes     true    \N      
+c3     int     Yes     false   \N      NONE
+c2     bigint  Yes     false   \N      NONE
+
+-- !select_desc3 --
+c1     int     Yes     true    \N      
+c3     int     Yes     false   \N      NONE
+c2     bigint  Yes     false   \N      NONE
+
+-- !select1 --
+1      3       2
+
diff --git a/regression-test/suites/insert_p0/test_txn.groovy 
b/regression-test/suites/insert_p0/transaction/test_txn.groovy
similarity index 100%
rename from regression-test/suites/insert_p0/test_txn.groovy
rename to regression-test/suites/insert_p0/transaction/test_txn.groovy
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
similarity index 99%
rename from regression-test/suites/insert_p0/txn_insert.groovy
rename to regression-test/suites/insert_p0/transaction/txn_insert.groovy
index 14b056540b6..1f595d89173 100644
--- a/regression-test/suites/insert_p0/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
@@ -287,7 +287,7 @@ suite("txn_insert") {
             if (observer_fe_url != null) {
                 logger.info("observer url: $observer_fe_url")
                 connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = observer_fe_url) {
-                    result = sql """ select count() from 
regression_test_insert_p0.${table}_0 """
+                    result = sql """ select count() from 
regression_test_insert_p0_transaction.${table}_0 """
                     logger.info("select from observer result: $result")
                     assertEquals(79, result[0][0])
                 }
@@ -403,7 +403,7 @@ suite("txn_insert") {
         }
 
         // 13. txn insert does not commit or rollback by user, and txn is 
aborted because connection is closed
-        def dbName = "regression_test_insert_p0"
+        def dbName = "regression_test_insert_p0_transaction"
         def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
         logger.info("url: ${url}")
         def get_txn_id_from_server_info = { serverInfo ->
@@ -776,7 +776,7 @@ suite("txn_insert") {
         }
     }
 
-    def db_name = "regression_test_insert_p0"
+    def db_name = "regression_test_insert_p0_transaction"
     def tables = sql """ show tables from $db_name """
     logger.info("tables: $tables")
     for (def table_info : tables) {
diff --git 
a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert.groovy
similarity index 97%
rename from regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
rename to 
regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert.groovy
index e520d0b9d95..6c19fc4ee34 100644
--- a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy
+++ 
b/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert.groovy
@@ -79,7 +79,7 @@ suite("txn_insert_concurrent_insert") {
     }
     sql """ sync """
 
-    def dbName = "regression_test_insert_p0"
+    def dbName = "regression_test_insert_p0_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 
@@ -119,7 +119,7 @@ suite("txn_insert_concurrent_insert") {
     logger.info("result: ${result}")
     assertEquals(2606192, result[0][0])
 
-    def db_name = "regression_test_insert_p0"
+    def db_name = "regression_test_insert_p0_transaction"
     def tables = sql """ show tables from $db_name """
     logger.info("tables: $tables")
     for (def table_info : tables) {
diff --git a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy
similarity index 99%
rename from regression-test/suites/insert_p0/txn_insert_inject_case.groovy
rename to 
regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy
index 50a7729656a..5478aeef69c 100644
--- a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy
+++ b/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy
@@ -152,7 +152,7 @@ suite("txn_insert_inject_case", "nonConcurrent") {
 
     // 2. commit failed
     sql """ truncate table ${table}_0 """
-    def dbName = "regression_test_insert_p0"
+    def dbName = "regression_test_insert_p0_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
     def get_txn_id_from_server_info = { serverInfo ->
diff --git a/regression-test/suites/insert_p0/txn_insert_restart_fe.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert_restart_fe.groovy
similarity index 100%
rename from regression-test/suites/insert_p0/txn_insert_restart_fe.groovy
rename to 
regression-test/suites/insert_p0/transaction/txn_insert_restart_fe.groovy
diff --git 
a/regression-test/suites/insert_p0/txn_insert_restart_fe_with_schema_change.groovy
 
b/regression-test/suites/insert_p0/transaction/txn_insert_restart_fe_with_schema_change.groovy
similarity index 100%
rename from 
regression-test/suites/insert_p0/txn_insert_restart_fe_with_schema_change.groovy
rename to 
regression-test/suites/insert_p0/transaction/txn_insert_restart_fe_with_schema_change.groovy
diff --git 
a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy
similarity index 98%
rename from 
regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy
rename to 
regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy
index e408d418289..245f2d07e53 100644
--- 
a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy
+++ 
b/regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 suite("txn_insert_values_with_schema_change") {
     def table = "txn_insert_values_with_schema_change"
 
-    def dbName = "regression_test_insert_p0"
+    def dbName = "regression_test_insert_p0_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
     List<String> errors = new ArrayList<>()
diff --git a/regression-test/suites/insert_p0/txn_insert_with_drop.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_drop.groovy
similarity index 100%
rename from regression-test/suites/insert_p0/txn_insert_with_drop.groovy
rename to 
regression-test/suites/insert_p0/transaction/txn_insert_with_drop.groovy
diff --git 
a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_schema_change.groovy
similarity index 99%
rename from 
regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
rename to 
regression-test/suites/insert_p0/transaction/txn_insert_with_schema_change.groovy
index 7f4eef8a92a..f2fc1a009f9 100644
--- a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy
+++ 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_schema_change.groovy
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
 
 suite("txn_insert_with_schema_change") {
     def table = "txn_insert_with_schema_change"
-    def dbName = "regression_test_insert_p0"
+    def dbName = "regression_test_insert_p0_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
     List<String> errors = new ArrayList<>()
diff --git 
a/regression-test/suites/insert_p0/txn_insert_with_specify_columns.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns.groovy
similarity index 100%
rename from 
regression-test/suites/insert_p0/txn_insert_with_specify_columns.groovy
rename to 
regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns.groovy
diff --git 
a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy
 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy
new file mode 100644
index 00000000000..de9162cabc7
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+suite("txn_insert_with_specify_columns_schema_change_add_key_column", 
"nonConcurrent") {
+    if(!isCloudMode()) {
+       def table = 
"txn_insert_with_specify_columns_schema_change_add_key_column"
+
+       def dbName = "regression_test_insert_p0_transaction"
+       def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+       logger.info("url: ${url}")
+       List<String> errors = new ArrayList<>()
+       CountDownLatch insertLatch = new CountDownLatch(1)
+       CountDownLatch insertLatch2 = new CountDownLatch(1)
+
+       sql """ DROP TABLE IF EXISTS $table force """
+       sql """
+           create table $table (
+               c1 INT NULL,
+               c2 INT NULL,
+               c3 INT NULL
+           ) ENGINE=OLAP
+           UNIQUE KEY(c1)
+           DISTRIBUTED BY HASH(c1) BUCKETS 1
+           PROPERTIES (
+       "replication_num" = "1"); 
+       """
+       sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """
+
+       def getAlterTableState = { job_state ->
+           def retry = 0
+           sql "use ${dbName};"
+           while (true) {
+               sleep(2000)
+               def state = sql " show alter table column where tablename = 
'${table}' order by CreateTime desc limit 1"
+               logger.info("alter table state: ${state}")
+               if (state.size() > 0 && state[0][9] == job_state) {
+                   return
+               }
+               retry++
+               if (retry >= 10) {
+                   break
+               }
+           }
+           assertTrue(false, "alter table job state is ${last_state}, not 
${job_state} after retry ${retry} times")
+       }
+
+       def txnInsert = {
+           try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+                Statement statement = conn.createStatement()) {
+               try {
+                   qt_select_desc1 """desc $table"""
+
+                   insertLatch.await(2, TimeUnit.MINUTES)
+
+                   statement.execute("begin")
+                   statement.execute("insert into ${table} (c3, c2, c1) values 
(33, 22, 11),(333, 222, 111);")
+
+                   insertLatch2.await(2, TimeUnit.MINUTES)
+                   qt_select_desc2 """desc $table"""
+                   statement.execute("insert into ${table} (c3, c2, c1) 
values(3333, 2222, 1111);")
+                   statement.execute("insert into ${table} (c3, c2, c1) 
values(33333, 22222, 11111),(333333, 222222, 111111);")
+                   statement.execute("commit")
+               } catch (Exception e) {
+                   logger.info("txn insert failed", e)
+                   assertTrue(e.getMessage().contains("There are schema 
changes in one transaction, you can commit this transaction with formal data or 
rollback this whole transaction."))
+                   statement.execute("rollback")
+               }
+           }
+       }
+
+       def schemaChange = { sql ->
+           try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+               Statement statement = conn.createStatement()) {
+               statement.execute(sql)
+               getAlterTableState("RUNNING")
+               insertLatch.countDown()
+               getAlterTableState("FINISHED")
+               insertLatch2.countDown()
+           } catch (Throwable e) {
+               logger.error("schema change failed", e)
+               errors.add("schema change failed " + e.getMessage())
+           }
+       }
+
+       GetDebugPoint().clearDebugPointsForAllBEs()
+       GetDebugPoint().clearDebugPointsForAllFEs()
+       try {
+           
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+           Thread schema_change_thread = new Thread(() -> schemaChange("alter 
table ${table} add column new_col int key after c1;"))
+           Thread insert_thread = new Thread(() -> txnInsert())
+           schema_change_thread.start()
+           insert_thread.start()
+           schema_change_thread.join()
+           insert_thread.join()
+
+           logger.info("errors: " + errors)
+           assertEquals(0, errors.size())
+           getAlterTableState("FINISHED")
+           order_qt_select1 """select * from ${table} order by c1, c2, c3"""
+       } catch (Exception e) {
+           logger.info("failed: " + e.getMessage())
+           assertTrue(false)
+       } finally {
+           GetDebugPoint().clearDebugPointsForAllBEs()
+       }
+    }
+}
diff --git 
a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy
 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy
new file mode 100644
index 00000000000..1d072f572c6
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+suite("txn_insert_with_specify_columns_schema_change_add_value_column", 
"nonConcurrent") {
+    if(!isCloudMode()) {
+        def table = 
"txn_insert_with_specify_columns_schema_change_add_value_column"
+
+        def dbName = "regression_test_insert_p0_transaction"
+        def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+        logger.info("url: ${url}")
+        List<String> errors = new ArrayList<>()
+        CountDownLatch insertLatch = new CountDownLatch(1)
+        CountDownLatch schemaChangeLatch = new CountDownLatch(1)
+
+        sql """ DROP TABLE IF EXISTS $table force """
+        sql """
+            create table $table (
+                c1 INT NULL,
+                c2 INT NULL,
+                c3 INT NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(c1)
+            DISTRIBUTED BY HASH(c1) BUCKETS 1
+            PROPERTIES (
+        "replication_num" = "1"); 
+        """
+        sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """
+
+        def getAlterTableState = { job_state ->
+            def retry = 0
+            sql "use ${dbName};"
+            while (true) {
+                sleep(2000)
+                def state = sql " show alter table column where tablename = 
'${table}' order by CreateTime desc limit 1"
+                logger.info("alter table state: ${state}")
+                if (state.size() > 0 && state[0][9] == job_state) {
+                    return
+                }
+                retry++
+                if (retry >= 10) {
+                    break
+                }
+            }
+            assertTrue(false, "alter table job state is ${last_state}, not 
${job_state} after retry ${retry} times")
+        }
+
+        def txnInsert = {
+            try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+                Statement statement = conn.createStatement()) {
+                try {
+                    qt_select_desc1 """desc $table"""
+
+                    statement.execute("begin")
+                    statement.execute("insert into ${table} (c3, c2, c1) 
values (33, 22, 11),(333, 222, 111);")
+                
+                    schemaChangeLatch.countDown()
+
+                    insertLatch.await(2, TimeUnit.MINUTES)
+
+                    qt_select_desc2 """desc $table"""
+                    statement.execute("insert into ${table} (c3, c2, c1) 
values(3333, 2222, 1111);")
+                    statement.execute("insert into ${table} (c3, c2, c1) 
values(33333, 22222, 11111),(333333, 222222, 111111);")
+                    statement.execute("commit")
+                } catch (Exception e) {
+                    logger.info("txn insert failed", e)
+                    assertTrue(e.getMessage().contains("There are schema 
changes in one transaction, you can commit this transaction with formal data or 
rollback this whole transaction."))
+                    statement.execute("rollback")
+                }
+            }
+        }
+
+        def schemaChange = { sql ->
+            try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+                Statement statement = conn.createStatement()) {
+                schemaChangeLatch.await(2, TimeUnit.MINUTES)
+                statement.execute(sql)
+                getAlterTableState("FINISHED")
+                insertLatch.countDown()
+            } catch (Throwable e) {
+                logger.error("schema change failed", e)
+                errors.add("schema change failed " + e.getMessage())
+            }
+        }
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+            Thread schema_change_thread = new Thread(() -> schemaChange("alter 
table ${table} add column new_col int after c3;"))
+            Thread insert_thread = new Thread(() -> txnInsert())
+            schema_change_thread.start()
+            insert_thread.start()
+
+            schema_change_thread.join()
+            insert_thread.join()
+
+            logger.info("errors: " + errors)
+            assertEquals(0, errors.size())
+            getAlterTableState("FINISHED")
+            order_qt_select1 """select * from ${table} order by c1, c2, c3"""
+        } catch (Exception e) {
+            logger.info("failed: " + e.getMessage())
+            assertTrue(false)
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+
+}
diff --git 
a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy
 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy
new file mode 100644
index 00000000000..e4c16fdf8c3
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+suite("txn_insert_with_specify_columns_schema_change_reorder_column", 
"nonConcurrent") {
+    if(!isCloudMode()){
+        def table = 
"txn_insert_with_specify_columns_schema_change_reorder_column"
+
+        def dbName = "regression_test_insert_p0_transaction"
+        def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+        logger.info("url: ${url}")
+        List<String> errors = new ArrayList<>()
+        CountDownLatch insertLatch = new CountDownLatch(1)
+        CountDownLatch insertLatch2 = new CountDownLatch(1)
+
+        sql """ DROP TABLE IF EXISTS $table force """
+        sql """
+            create table $table (
+                c1 INT NULL,
+                c2 BIGINT NULL,
+                c3 INT NULL
+            ) ENGINE=OLAP
+            UNIQUE KEY(c1)
+            DISTRIBUTED BY HASH(c1) BUCKETS 1
+            PROPERTIES (
+        "replication_num" = "1"); 
+        """
+        sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """
+
+        def getAlterTableState = { job_state ->
+            def retry = 0
+            sql "use ${dbName};"
+            while (true) {
+                sleep(2000)
+                def state = sql " show alter table column where tablename = 
'${table}' order by CreateTime desc limit 1"
+                logger.info("alter table state: ${state}")
+                if (state.size() > 0 && state[0][9] == job_state) {
+                    return
+                }
+                retry++
+                if (retry >= 10) {
+                    break
+                }
+            }
+            assertTrue(false, "alter table job state is ${last_state}, not 
${job_state} after retry ${retry} times")
+        }
+
+        def txnInsert = {
+            try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+                 Statement statement = conn.createStatement()) {
+                try {
+                    qt_select_desc1 """desc $table"""
+
+                    insertLatch.await(2, TimeUnit.MINUTES)
+
+                    statement.execute("begin")
+                    statement.execute("insert into ${table} (c3, c2, c1) 
values (33, 22, 11),(333, 222, 111);")
+
+                    insertLatch2.await(2, TimeUnit.MINUTES)
+
+                    qt_select_desc2 """desc $table"""
+                    statement.execute("insert into ${table} (c3, c2, c1) 
values(3333, 2222, 1111);")
+                    statement.execute("insert into ${table} (c3, c2, c1) 
values(33333, 22222, 11111),(333333, 222222, 111111);")
+                    statement.execute("commit")
+                } catch (Throwable e) {
+                    logger.error("txn insert failed", e)
+                    assertTrue(e.getMessage().contains("There are schema 
changes in one transaction, you can commit this transaction with formal data or 
rollback this whole transaction."))
+                    statement.execute("rollback")
+                }
+            }
+        }
+
+        def schemaChange = { sql ->
+            try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+                Statement statement = conn.createStatement()) {
+                statement.execute(sql)
+                getAlterTableState("RUNNING")
+                insertLatch.countDown()
+                getAlterTableState("FINISHED")
+                insertLatch2.countDown()
+            } catch (Throwable e) {
+                logger.error("schema change failed", e)
+                errors.add("schema change failed " + e.getMessage())
+            }
+        }
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+            Thread schema_change_thread = new Thread(() -> schemaChange("alter 
table ${table} order by (c1,c3,c2);"))
+            Thread insert_thread = new Thread(() -> txnInsert())
+            schema_change_thread.start()
+            insert_thread.start()
+            schema_change_thread.join()
+            insert_thread.join()
+
+            logger.info("errors: " + errors)
+            assertEquals(0, errors.size())
+            getAlterTableState("FINISHED")
+            qt_select_desc3 """desc $table"""
+            order_qt_select1 """select * from ${table} order by c1, c2, c3"""
+        } catch (Exception e) {
+            logger.info("failed: " + e.getMessage())
+            assertTrue(false)
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to