This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d60268d30c8  [fix](cloud) fix job status is empty when label reused in 
cloud mode (#42878)
d60268d30c8 is described below

commit d60268d30c88da41e42450170e497279b0e284bc
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 31 16:57:26 2024 +0800

     [fix](cloud) fix job status is empty when label reused in cloud mode 
(#42878)
    
    Cherry-picked from #42700
---
 cloud/src/meta-service/meta_service_txn.cpp        |  1 +
 .../transaction/CloudGlobalTransactionMgr.java     |  3 +-
 .../doris/common/LabelAlreadyUsedException.java    | 21 +++++
 gensrc/proto/cloud.proto                           |  1 +
 .../stream_load/test_stream_load_job_status.groovy | 90 ++++++++++++++++++++++
 5 files changed, 115 insertions(+), 1 deletion(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 03cb7866e1a..ddd1f8cdf79 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -266,6 +266,7 @@ void 
MetaServiceImpl::begin_txn(::google::protobuf::RpcController* controller,
                 }
                 // clang-format on
             }
+            response->set_txn_status(cur_txn_info.status());
             code = MetaServiceCode::TXN_LABEL_ALREADY_USED;
             ss << "Label [" << label << "] has already been used, relate to 
txn ["
                << cur_txn_info.txn_id() << "], status=[" << 
TxnStatusPB_Name(cur_txn_info.status())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 81c5a3bfdba..131473470ab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -273,7 +273,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     throw new 
DuplicatedRequestException(DebugUtil.printId(requestId),
                             beginTxnResponse.getDupTxnId(), 
beginTxnResponse.getStatus().getMsg());
                 case TXN_LABEL_ALREADY_USED:
-                    throw new 
LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false);
+                    throw new 
LabelAlreadyUsedException(beginTxnResponse.getStatus().getMsg(), false,
+                            beginTxnResponse.getTxnStatus());
                 default:
                     if (MetricRepo.isInit) {
                         MetricRepo.COUNTER_TXN_REJECT.increase(1L);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
index 8c508809d59..f1789881cdc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/LabelAlreadyUsedException.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common;
 
+import org.apache.doris.cloud.proto.Cloud.TxnStatusPB;
 import org.apache.doris.transaction.TransactionState;
 
 import com.google.common.base.Preconditions;
@@ -37,6 +38,26 @@ public class LabelAlreadyUsedException extends DdlException {
         super(isLabel ? "Label [" + msg + "] has already been used." : msg);
     }
 
+    public LabelAlreadyUsedException(String msg, boolean isLabel, TxnStatusPB 
txnStatus) {
+        super(isLabel ? "Label [" + msg + "] has already been used." : msg);
+        switch (txnStatus) {
+            case TXN_STATUS_UNKNOWN:
+            case TXN_STATUS_PREPARED:
+                jobStatus = "RUNNING";
+                break;
+            case TXN_STATUS_PRECOMMITTED:
+                jobStatus = "PRECOMMITTED";
+                break;
+            case TXN_STATUS_COMMITTED:
+            case TXN_STATUS_VISIBLE:
+                jobStatus = "FINISHED";
+                break;
+            default:
+                Preconditions.checkState(false, txnStatus);
+                break;
+        }
+    }
+
     public LabelAlreadyUsedException(TransactionState txn) {
         super("Label [" + txn.getLabel() + "] has already been used, relate to 
txn [" + txn.getTransactionId()
                 + "], status [" + txn.getTransactionStatus() + "].");
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 93420bddbf6..8ae48851601 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -642,6 +642,7 @@ message BeginTxnResponse {
     optional MetaServiceResponseStatus status = 1;
     optional int64 txn_id = 2;
     optional int64 dup_txn_id = 3;
+    optional TxnStatusPB txn_status = 4;
     // TODO: There may be more fields TBD
 }
 
diff --git 
a/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy 
b/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy
new file mode 100644
index 00000000000..9cb38747e22
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_stream_load_job_status.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_stream_load_job_status", "p0") {
+    def tableName = "test_stream_load_job_status"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` bigint(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4) SUM NULL,
+            `v2` tinyint(4) REPLACE NULL,
+            `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+            `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+            `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+            `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+            `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+            `v8` datetime REPLACE_IF_NOT_NULL NULL,
+            `v9` date REPLACE_IF_NOT_NULL NULL,
+            `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+            `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+            `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+        ) ENGINE=OLAP
+        AGGREGATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`k1`)
+        (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")),
+        PARTITION partition_b VALUES [("100000"), ("1000000000")),
+        PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+        PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '\t'
+        set 'label', 'test_stream_load_job_status'
+        set 'columns', 'k1, k2, v2, v10, v11'
+        set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+        set 'strict_mode', 'true'
+
+        file 'test_strict_mode.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '\t'
+        set 'label', 'test_stream_load_job_status'
+        set 'columns', 'k1, k2, v2, v10, v11'
+        set 'partitions', 'partition_a, partition_b, partition_c, partition_d'
+        set 'strict_mode', 'true'
+
+        file 'test_strict_mode.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("label already exists", json.Status.toLowerCase())
+            assertEquals("finished", json.ExistingJobStatus.toLowerCase())
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to