This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new d60268d30c8 [fix](cloud) fix job status is empty when label reused in cloud mode (#42878) d60268d30c8 is described below commit d60268d30c88da41e42450170e497279b0e284bc Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Oct 31 16:57:26 2024 +0800 [fix](cloud) fix job status is empty when label reused in cloud mode (#42878) Cherry-picked from #42700 --- cloud/src/meta-service/meta_service_txn.cpp | 1 + .../transaction/CloudGlobalTransactionMgr.java | 3 +- .../doris/common/LabelAlreadyUsedException.java | 21 +++++ gensrc/proto/cloud.proto | 1 + .../stream_load/test_stream_load_job_status.groovy | 90 ++++++++++++++++++++++ 5 files changed, 115 insertions(+), 1 deletion(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 03cb7866e1a..ddd1f8cdf79 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -266,6 +266,7 @@ void MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller, } // clang-format on } + response->set_txn_status(cur_txn_info.status()); code = MetaServiceCode::TXN_LABEL_ALREADY_USED; ss << "Label [" << label << "] has already been used, relate to txn [" << cur_txn_info.txn_id() << "], status=[" << TxnStatusPB_Name(cur_txn_info.status()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 81c5a3bfdba..131473470ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -273,7 +273,8 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { throw new DuplicatedRequestException(DebugUtil.printId(requestId), beginTxnResponse.getDupTxnId(), beginTxnResponse.getStatus().getMsg()); case TXN_LABEL_ALREADY_USED: - throw new LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false); + throw new LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false, + beginTxnResponse.getTxnStatus()); default: if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_REJECT.increase(1L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java index 8c508809d59..f1789881cdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java @@ -17,6 +17,7 @@ package org.apache.doris.common; +import org.apache.doris.cloud.proto.Cloud.TxnStatusPB; import org.apache.doris.transaction.TransactionState; import com.google.common.base.Preconditions; @@ -37,6 +38,26 @@ public class LabelAlreadyUsedException extends DdlException { super(isLabel ? "Label [" + msg + "] has already been used." : msg); } + public LabelAlreadyUsedException(String msg, boolean isLabel, TxnStatusPB txnStatus) { + super(isLabel ? "Label [" + msg + "] has already been used." : msg); + switch (txnStatus) { + case TXN_STATUS_UNKNOWN: + case TXN_STATUS_PREPARED: + jobStatus = "RUNNING"; + break; + case TXN_STATUS_PRECOMMITTED: + jobStatus = "PRECOMMITTED"; + break; + case TXN_STATUS_COMMITTED: + case TXN_STATUS_VISIBLE: + jobStatus = "FINISHED"; + break; + default: + Preconditions.checkState(false, txnStatus); + break; + } + } + public LabelAlreadyUsedException(TransactionState txn) { super("Label [" + txn.getLabel() + "] has already been used, relate to txn [" + txn.getTransactionId() + "], status [" + txn.getTransactionStatus() + "]."); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 93420bddbf6..8ae48851601 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -642,6 +642,7 @@ message BeginTxnResponse { optional MetaServiceResponseStatus status = 1; optional int64 txn_id = 2; optional int64 dup_txn_id = 3; + optional TxnStatusPB txn_status = 4; // TODO: There may be more fields TBD } diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy new file mode 100644 index 00000000000..9cb38747e22 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_job_status", "p0") { + def tableName = "test_stream_load_job_status" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL, + `v4` smallint(6) REPLACE_IF_NOT_NULL NULL, + `v5` int(11) REPLACE_IF_NOT_NULL NULL, + `v6` bigint(20) REPLACE_IF_NOT_NULL NULL, + `v7` largeint(40) REPLACE_IF_NOT_NULL NULL, + `v8` datetime REPLACE_IF_NOT_NULL NULL, + `v9` date REPLACE_IF_NOT_NULL NULL, + `v10` char(10) REPLACE_IF_NOT_NULL NULL, + `v11` varchar(6) REPLACE_IF_NOT_NULL NULL, + `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")), + PARTITION partition_b VALUES [("100000"), ("1000000000")), + PARTITION partition_c VALUES [("1000000000"), ("10000000000")), + PARTITION partition_d VALUES [("10000000000"), (MAXVALUE))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + streamLoad { + table "${tableName}" + set 'column_separator', '\t' + set 'label', 'test_stream_load_job_status' + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' + + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '\t' + set 'label', 'test_stream_load_job_status' + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' + + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("label already exists", json.Status.toLowerCase()) + assertEquals("finished", json.ExistingJobStatus.toLowerCase()) + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org