This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f925b6ea70541311faa35dcb5c96e92324debcff Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Wed Aug 28 23:14:26 2024 +0800 [Enhancement](txn) Block new insert into if schema change happens during transaction (#39483) --- be/src/olap/schema_change.cpp | 1 + .../insert/BatchInsertIntoTableCommand.java | 11 ++ .../trees/plans/physical/PhysicalResultSink.java | 4 - .../java/org/apache/doris/qe/StmtExecutor.java | 1 + .../apache/doris/transaction/TransactionEntry.java | 18 +++ .../data/insert_p0/{ => transaction}/test_txn.out | 0 .../insert_p0/{ => transaction}/txn_insert.out | 0 .../{ => transaction}/txn_insert_inject_case.out | 0 .../{ => transaction}/txn_insert_restart_fe.out | 0 .../txn_insert_restart_fe_with_schema_change.out | 0 .../txn_insert_values_with_schema_change.out | 0 .../{ => transaction}/txn_insert_with_drop.out | 0 .../txn_insert_with_schema_change.out | 0 .../txn_insert_with_specify_columns.out | 0 ...pecify_columns_schema_change_add_key_column.out | 15 +++ ...cify_columns_schema_change_add_value_column.out | 15 +++ ...pecify_columns_schema_change_reorder_column.out | 19 +++ .../insert_p0/{ => transaction}/test_txn.groovy | 0 .../insert_p0/{ => transaction}/txn_insert.groovy | 6 +- .../txn_insert_concurrent_insert.groovy | 4 +- .../txn_insert_inject_case.groovy | 2 +- .../{ => transaction}/txn_insert_restart_fe.groovy | 0 ...txn_insert_restart_fe_with_schema_change.groovy | 0 .../txn_insert_values_with_schema_change.groovy | 2 +- .../{ => transaction}/txn_insert_with_drop.groovy | 0 .../txn_insert_with_schema_change.groovy | 2 +- .../txn_insert_with_specify_columns.groovy | 0 ...ify_columns_schema_change_add_key_column.groovy | 127 ++++++++++++++++++++ ...y_columns_schema_change_add_value_column.groovy | 129 +++++++++++++++++++++ ...ify_columns_schema_change_reorder_column.groovy | 129 +++++++++++++++++++++ 30 files changed, 473 insertions(+), 12 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 04409e37d84..4418d02ecdf 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -777,6 +777,7 @@ SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine, // The admin should upgrade all BE and then upgrade FE. // Should delete the old code after upgrade finished. Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& request) { + DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { sleep(10); }) Status res; signal::tablet_id = _base_tablet->get_table_id(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java index a1b834fe904..04d43e8cc33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java @@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.analysis.StmtType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; @@ -120,6 +121,16 @@ public class BatchInsertIntoTableCommand extends Command implements NoForward, E Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode"); sink = ((PhysicalOlapTableSink<?>) plan.get()); Table targetTable = sink.getTargetTable(); + if (ctx.getTxnEntry().isFirstTxnInsert()) { + ctx.getTxnEntry().setTxnSchemaVersion(((OlapTable) targetTable).getBaseSchemaVersion()); + ctx.getTxnEntry().setFirstTxnInsert(false); + } else { + if (((OlapTable) targetTable).getBaseSchemaVersion() != ctx.getTxnEntry().getTxnSchemaVersion()) { + throw new AnalysisException("There are schema changes in one transaction, " + + "you can commit this transaction with formal data or rollback " + + "this whole transaction."); + } + } // should set columns of sink since we maybe generate some invisible columns List<Column> fullSchema = sink.getTargetTable().getFullSchema(); List<Column> targetSchema = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java index aceb1f13774..8fb6dfb286e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java @@ -58,10 +58,6 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CH logicalProperties, physicalProperties, statistics, child); } - public List<NamedExpression> getOutputExprs() { - return outputExprs; - } - @Override public PhysicalResultSink<Plan> withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 64616420474..86e5603d94c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2044,6 +2044,7 @@ public class StmtExecutor { .setTxnConf(new TTxnParams().setNeedTxn(true).setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("") .setTbl("").setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 : context.getSessionVariable().getInsertMaxFilterRatio())); + context.getTxnEntry().setFirstTxnInsert(true); StringBuilder sb = new StringBuilder(); sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") .append(TransactionStatus.PREPARE.name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index 6771d9c3156..4a2beea2d0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -80,6 +80,8 @@ public class TransactionEntry { private List<InternalService.PDataRow> dataToSend = new ArrayList<>(); private long rowsInTransaction = 0; private Types.PUniqueId pLoadId; + private boolean isFirstTxnInsert = false; + private volatile int txnSchemaVersion = -1; // for insert into select for multi tables private boolean isTransactionBegan = false; @@ -181,6 +183,22 @@ public class TransactionEntry { this.pLoadId = pLoadId; } + public boolean isFirstTxnInsert() { + return isFirstTxnInsert; + } + + public void setFirstTxnInsert(boolean firstTxnInsert) { + isFirstTxnInsert = firstTxnInsert; + } + + public int getTxnSchemaVersion() { + return txnSchemaVersion; + } + + public void setTxnSchemaVersion(int txnSchemaVersion) { + this.txnSchemaVersion = txnSchemaVersion; + } + // Used for insert into select, return the sub_txn_id for this insert public long beginTransaction(TableIf table, SubTransactionType subTransactionType) throws Exception { if (isInsertValuesTxnBegan()) { diff --git a/regression-test/data/insert_p0/test_txn.out b/regression-test/data/insert_p0/transaction/test_txn.out similarity index 100% rename from regression-test/data/insert_p0/test_txn.out rename to regression-test/data/insert_p0/transaction/test_txn.out diff --git a/regression-test/data/insert_p0/txn_insert.out b/regression-test/data/insert_p0/transaction/txn_insert.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert.out rename to regression-test/data/insert_p0/transaction/txn_insert.out diff --git a/regression-test/data/insert_p0/txn_insert_inject_case.out b/regression-test/data/insert_p0/transaction/txn_insert_inject_case.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert_inject_case.out rename to regression-test/data/insert_p0/transaction/txn_insert_inject_case.out diff --git a/regression-test/data/insert_p0/txn_insert_restart_fe.out b/regression-test/data/insert_p0/transaction/txn_insert_restart_fe.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert_restart_fe.out rename to regression-test/data/insert_p0/transaction/txn_insert_restart_fe.out diff --git a/regression-test/data/insert_p0/txn_insert_restart_fe_with_schema_change.out b/regression-test/data/insert_p0/transaction/txn_insert_restart_fe_with_schema_change.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert_restart_fe_with_schema_change.out rename to regression-test/data/insert_p0/transaction/txn_insert_restart_fe_with_schema_change.out diff --git a/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out b/regression-test/data/insert_p0/transaction/txn_insert_values_with_schema_change.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert_values_with_schema_change.out rename to regression-test/data/insert_p0/transaction/txn_insert_values_with_schema_change.out diff --git a/regression-test/data/insert_p0/txn_insert_with_drop.out b/regression-test/data/insert_p0/transaction/txn_insert_with_drop.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert_with_drop.out rename to regression-test/data/insert_p0/transaction/txn_insert_with_drop.out diff --git a/regression-test/data/insert_p0/txn_insert_with_schema_change.out b/regression-test/data/insert_p0/transaction/txn_insert_with_schema_change.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert_with_schema_change.out rename to regression-test/data/insert_p0/transaction/txn_insert_with_schema_change.out diff --git a/regression-test/data/insert_p0/txn_insert_with_specify_columns.out b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert_with_specify_columns.out rename to regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns.out diff --git a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out new file mode 100644 index 00000000000..b06ce07b4a8 --- /dev/null +++ b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_desc1 -- +c1 int Yes true \N +c2 int Yes false \N NONE +c3 int Yes false \N NONE + +-- !select_desc2 -- +c1 int Yes true \N +new_col int Yes true \N +c2 int Yes false \N NONE +c3 int Yes false \N NONE + +-- !select1 -- +1 \N 2 3 + diff --git a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out new file mode 100644 index 00000000000..560051c8800 --- /dev/null +++ b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_desc1 -- +c1 int Yes true \N +c2 int Yes false \N NONE +c3 int Yes false \N NONE + +-- !select_desc2 -- +c1 int Yes true \N +c2 int Yes false \N NONE +c3 int Yes false \N NONE +new_col int Yes false \N NONE + +-- !select1 -- +1 2 3 \N + diff --git a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out new file mode 100644 index 00000000000..2cac2aa11bc --- /dev/null +++ b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_desc1 -- +c1 int Yes true \N +c2 bigint Yes false \N NONE +c3 int Yes false \N NONE + +-- !select_desc2 -- +c1 int Yes true \N +c3 int Yes false \N NONE +c2 bigint Yes false \N NONE + +-- !select_desc3 -- +c1 int Yes true \N +c3 int Yes false \N NONE +c2 bigint Yes false \N NONE + +-- !select1 -- +1 3 2 + diff --git a/regression-test/suites/insert_p0/test_txn.groovy b/regression-test/suites/insert_p0/transaction/test_txn.groovy similarity index 100% rename from regression-test/suites/insert_p0/test_txn.groovy rename to regression-test/suites/insert_p0/transaction/test_txn.groovy diff --git a/regression-test/suites/insert_p0/txn_insert.groovy b/regression-test/suites/insert_p0/transaction/txn_insert.groovy similarity index 99% rename from regression-test/suites/insert_p0/txn_insert.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert.groovy index 14b056540b6..1f595d89173 100644 --- a/regression-test/suites/insert_p0/txn_insert.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert.groovy @@ -287,7 +287,7 @@ suite("txn_insert") { if (observer_fe_url != null) { logger.info("observer url: $observer_fe_url") connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = observer_fe_url) { - result = sql """ select count() from regression_test_insert_p0.${table}_0 """ + result = sql """ select count() from regression_test_insert_p0_transaction.${table}_0 """ logger.info("select from observer result: $result") assertEquals(79, result[0][0]) } @@ -403,7 +403,7 @@ suite("txn_insert") { } // 13. txn insert does not commit or rollback by user, and txn is aborted because connection is closed - def dbName = "regression_test_insert_p0" + def dbName = "regression_test_insert_p0_transaction" def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") def get_txn_id_from_server_info = { serverInfo -> @@ -776,7 +776,7 @@ suite("txn_insert") { } } - def db_name = "regression_test_insert_p0" + def db_name = "regression_test_insert_p0_transaction" def tables = sql """ show tables from $db_name """ logger.info("tables: $tables") for (def table_info : tables) { diff --git a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert.groovy similarity index 97% rename from regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert.groovy index e520d0b9d95..6c19fc4ee34 100644 --- a/regression-test/suites/insert_p0/txn_insert_concurrent_insert.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert.groovy @@ -79,7 +79,7 @@ suite("txn_insert_concurrent_insert") { } sql """ sync """ - def dbName = "regression_test_insert_p0" + def dbName = "regression_test_insert_p0_transaction" def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") @@ -119,7 +119,7 @@ suite("txn_insert_concurrent_insert") { logger.info("result: ${result}") assertEquals(2606192, result[0][0]) - def db_name = "regression_test_insert_p0" + def db_name = "regression_test_insert_p0_transaction" def tables = sql """ show tables from $db_name """ logger.info("tables: $tables") for (def table_info : tables) { diff --git a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy similarity index 99% rename from regression-test/suites/insert_p0/txn_insert_inject_case.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy index 50a7729656a..5478aeef69c 100644 --- a/regression-test/suites/insert_p0/txn_insert_inject_case.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert_inject_case.groovy @@ -152,7 +152,7 @@ suite("txn_insert_inject_case", "nonConcurrent") { // 2. commit failed sql """ truncate table ${table}_0 """ - def dbName = "regression_test_insert_p0" + def dbName = "regression_test_insert_p0_transaction" def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") def get_txn_id_from_server_info = { serverInfo -> diff --git a/regression-test/suites/insert_p0/txn_insert_restart_fe.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_restart_fe.groovy similarity index 100% rename from regression-test/suites/insert_p0/txn_insert_restart_fe.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_restart_fe.groovy diff --git a/regression-test/suites/insert_p0/txn_insert_restart_fe_with_schema_change.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_restart_fe_with_schema_change.groovy similarity index 100% rename from regression-test/suites/insert_p0/txn_insert_restart_fe_with_schema_change.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_restart_fe_with_schema_change.groovy diff --git a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy similarity index 98% rename from regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy index e408d418289..245f2d07e53 100644 --- a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit suite("txn_insert_values_with_schema_change") { def table = "txn_insert_values_with_schema_change" - def dbName = "regression_test_insert_p0" + def dbName = "regression_test_insert_p0_transaction" def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") List<String> errors = new ArrayList<>() diff --git a/regression-test/suites/insert_p0/txn_insert_with_drop.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_drop.groovy similarity index 100% rename from regression-test/suites/insert_p0/txn_insert_with_drop.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_with_drop.groovy diff --git a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_schema_change.groovy similarity index 99% rename from regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_with_schema_change.groovy index 7f4eef8a92a..f2fc1a009f9 100644 --- a/regression-test/suites/insert_p0/txn_insert_with_schema_change.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert_with_schema_change.groovy @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit suite("txn_insert_with_schema_change") { def table = "txn_insert_with_schema_change" - def dbName = "regression_test_insert_p0" + def dbName = "regression_test_insert_p0_transaction" def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") List<String> errors = new ArrayList<>() diff --git a/regression-test/suites/insert_p0/txn_insert_with_specify_columns.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns.groovy similarity index 100% rename from regression-test/suites/insert_p0/txn_insert_with_specify_columns.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns.groovy diff --git a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy new file mode 100644 index 00000000000..de9162cabc7 --- /dev/null +++ b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +suite("txn_insert_with_specify_columns_schema_change_add_key_column", "nonConcurrent") { + if(!isCloudMode()) { + def table = "txn_insert_with_specify_columns_schema_change_add_key_column" + + def dbName = "regression_test_insert_p0_transaction" + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" + logger.info("url: ${url}") + List<String> errors = new ArrayList<>() + CountDownLatch insertLatch = new CountDownLatch(1) + CountDownLatch insertLatch2 = new CountDownLatch(1) + + sql """ DROP TABLE IF EXISTS $table force """ + sql """ + create table $table ( + c1 INT NULL, + c2 INT NULL, + c3 INT NULL + ) ENGINE=OLAP + UNIQUE KEY(c1) + DISTRIBUTED BY HASH(c1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """ + + def getAlterTableState = { job_state -> + def retry = 0 + sql "use ${dbName};" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times") + } + + def txnInsert = { + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + try { + qt_select_desc1 """desc $table""" + + insertLatch.await(2, TimeUnit.MINUTES) + + statement.execute("begin") + statement.execute("insert into ${table} (c3, c2, c1) values (33, 22, 11),(333, 222, 111);") + + insertLatch2.await(2, TimeUnit.MINUTES) + qt_select_desc2 """desc $table""" + statement.execute("insert into ${table} (c3, c2, c1) values(3333, 2222, 1111);") + statement.execute("insert into ${table} (c3, c2, c1) values(33333, 22222, 11111),(333333, 222222, 111111);") + statement.execute("commit") + } catch (Exception e) { + logger.info("txn insert failed", e) + assertTrue(e.getMessage().contains("There are schema changes in one transaction, you can commit this transaction with formal data or rollback this whole transaction.")) + statement.execute("rollback") + } + } + } + + def schemaChange = { sql -> + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + statement.execute(sql) + getAlterTableState("RUNNING") + insertLatch.countDown() + getAlterTableState("FINISHED") + insertLatch2.countDown() + } catch (Throwable e) { + logger.error("schema change failed", e) + errors.add("schema change failed " + e.getMessage()) + } + } + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep") + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} add column new_col int key after c1;")) + Thread insert_thread = new Thread(() -> txnInsert()) + schema_change_thread.start() + insert_thread.start() + schema_change_thread.join() + insert_thread.join() + + logger.info("errors: " + errors) + assertEquals(0, errors.size()) + getAlterTableState("FINISHED") + order_qt_select1 """select * from ${table} order by c1, c2, c3""" + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +} diff --git a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy new file mode 100644 index 00000000000..1d072f572c6 --- /dev/null +++ b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +suite("txn_insert_with_specify_columns_schema_change_add_value_column", "nonConcurrent") { + if(!isCloudMode()) { + def table = "txn_insert_with_specify_columns_schema_change_add_value_column" + + def dbName = "regression_test_insert_p0_transaction" + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" + logger.info("url: ${url}") + List<String> errors = new ArrayList<>() + CountDownLatch insertLatch = new CountDownLatch(1) + CountDownLatch schemaChangeLatch = new CountDownLatch(1) + + sql """ DROP TABLE IF EXISTS $table force """ + sql """ + create table $table ( + c1 INT NULL, + c2 INT NULL, + c3 INT NULL + ) ENGINE=OLAP + UNIQUE KEY(c1) + DISTRIBUTED BY HASH(c1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """ + + def getAlterTableState = { job_state -> + def retry = 0 + sql "use ${dbName};" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times") + } + + def txnInsert = { + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + try { + qt_select_desc1 """desc $table""" + + statement.execute("begin") + statement.execute("insert into ${table} (c3, c2, c1) values (33, 22, 11),(333, 222, 111);") + + schemaChangeLatch.countDown() + + insertLatch.await(2, TimeUnit.MINUTES) + + qt_select_desc2 """desc $table""" + statement.execute("insert into ${table} (c3, c2, c1) values(3333, 2222, 1111);") + statement.execute("insert into ${table} (c3, c2, c1) values(33333, 22222, 11111),(333333, 222222, 111111);") + statement.execute("commit") + } catch (Exception e) { + logger.info("txn insert failed", e) + assertTrue(e.getMessage().contains("There are schema changes in one transaction, you can commit this transaction with formal data or rollback this whole transaction.")) + statement.execute("rollback") + } + } + } + + def schemaChange = { sql -> + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + schemaChangeLatch.await(2, TimeUnit.MINUTES) + statement.execute(sql) + getAlterTableState("FINISHED") + insertLatch.countDown() + } catch (Throwable e) { + logger.error("schema change failed", e) + errors.add("schema change failed " + e.getMessage()) + } + } + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep") + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} add column new_col int after c3;")) + Thread insert_thread = new Thread(() -> txnInsert()) + schema_change_thread.start() + insert_thread.start() + + schema_change_thread.join() + insert_thread.join() + + logger.info("errors: " + errors) + assertEquals(0, errors.size()) + getAlterTableState("FINISHED") + order_qt_select1 """select * from ${table} order by c1, c2, c3""" + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } + +} diff --git a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy new file mode 100644 index 00000000000..e4c16fdf8c3 --- /dev/null +++ b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +suite("txn_insert_with_specify_columns_schema_change_reorder_column", "nonConcurrent") { + if(!isCloudMode()){ + def table = "txn_insert_with_specify_columns_schema_change_reorder_column" + + def dbName = "regression_test_insert_p0_transaction" + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" + logger.info("url: ${url}") + List<String> errors = new ArrayList<>() + CountDownLatch insertLatch = new CountDownLatch(1) + CountDownLatch insertLatch2 = new CountDownLatch(1) + + sql """ DROP TABLE IF EXISTS $table force """ + sql """ + create table $table ( + c1 INT NULL, + c2 BIGINT NULL, + c3 INT NULL + ) ENGINE=OLAP + UNIQUE KEY(c1) + DISTRIBUTED BY HASH(c1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """ + + def getAlterTableState = { job_state -> + def retry = 0 + sql "use ${dbName};" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times") + } + + def txnInsert = { + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + try { + qt_select_desc1 """desc $table""" + + insertLatch.await(2, TimeUnit.MINUTES) + + statement.execute("begin") + statement.execute("insert into ${table} (c3, c2, c1) values (33, 22, 11),(333, 222, 111);") + + insertLatch2.await(2, TimeUnit.MINUTES) + + qt_select_desc2 """desc $table""" + statement.execute("insert into ${table} (c3, c2, c1) values(3333, 2222, 1111);") + statement.execute("insert into ${table} (c3, c2, c1) values(33333, 22222, 11111),(333333, 222222, 111111);") + statement.execute("commit") + } catch (Throwable e) { + logger.error("txn insert failed", e) + assertTrue(e.getMessage().contains("There are schema changes in one transaction, you can commit this transaction with formal data or rollback this whole transaction.")) + statement.execute("rollback") + } + } + } + + def schemaChange = { sql -> + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + statement.execute(sql) + getAlterTableState("RUNNING") + insertLatch.countDown() + getAlterTableState("FINISHED") + insertLatch2.countDown() + } catch (Throwable e) { + logger.error("schema change failed", e) + errors.add("schema change failed " + e.getMessage()) + } + } + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep") + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} order by (c1,c3,c2);")) + Thread insert_thread = new Thread(() -> txnInsert()) + schema_change_thread.start() + insert_thread.start() + schema_change_thread.join() + insert_thread.join() + + logger.info("errors: " + errors) + assertEquals(0, errors.size()) + getAlterTableState("FINISHED") + qt_select_desc3 """desc $table""" + order_qt_select1 """select * from ${table} order by c1, c2, c3""" + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org