This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new 23c6fd771be Revert "[revert](streaming job) revert "implement offset 
persistence and replay in cloud mode"" (#56156)
23c6fd771be is described below

commit 23c6fd771be575aa3cb1138f352387c8d32d0ca4
Author: wudi <w...@selectdb.com>
AuthorDate: Wed Sep 17 17:27:32 2025 +0800

    Revert "[revert](streaming job) revert "implement offset persistence and 
replay in cloud mode"" (#56156)
    
    Reverts apache/doris#56149
---
 cloud/src/common/bvars.cpp                         |   5 +
 cloud/src/common/bvars.h                           |   3 +
 cloud/src/meta-service/meta_service.h              |  13 ++
 cloud/src/meta-service/meta_service_txn.cpp        | 137 +++++++++++++++++++++
 cloud/src/meta-store/keys.cpp                      |  13 +-
 cloud/src/meta-store/keys.h                        |   5 +
 .../apache/doris/cloud/rpc/MetaServiceClient.java  |   6 +
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |   5 +
 .../transaction/CloudGlobalTransactionMgr.java     |  14 +++
 .../apache/doris/cloud/transaction/TxnUtil.java    |  38 ++++++
 .../insert/streaming/StreamingInsertJob.java       |  44 +++++++
 .../StreamingTaskTxnCommitAttachment.java          |  10 ++
 .../java/org/apache/doris/job/offset/Offset.java   |   4 +
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  10 ++
 gensrc/proto/cloud.proto                           |  31 ++++-
 15 files changed, 335 insertions(+), 3 deletions(-)

diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index e495b5ea95b..8ea24c4a1ba 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -85,6 +85,7 @@ BvarLatencyRecorderWithTag 
g_bvar_ms_remove_delete_bitmap("ms", "remove_delete_b
 BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock("ms", 
"remove_delete_bitmap_update_lock");
 BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
 BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", 
"get_rl_task_commit_attach");
+BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms", 
"get_streaming_task_commit_attach");
 BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", 
"reset_rl_progress");
 BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
 BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", 
"start_tablet_job");
@@ -364,6 +365,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_get_counter("rpc_kv_precommit_txn_ge
 mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_put_counter("rpc_kv_precommit_txn_put_counter",{"instance_id"});
 // get_rl_task_commit_attach
 mBvarInt64Adder 
g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_rl_task_commit_attach_get_counter",{"instance_id"});
+// get_streaming_task_commit_attach
+mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"});
 // reset_rl_progress
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"});
@@ -527,6 +530,8 @@ mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_get_bytes("rpc_kv_precommit_txn_get_
 mBvarInt64Adder 
g_bvar_rpc_kv_precommit_txn_put_bytes("rpc_kv_precommit_txn_put_bytes",{"instance_id"});
 // get_rl_task_commit_attach
 mBvarInt64Adder 
g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl_task_commit_attach_get_bytes",{"instance_id"});
+// get_streaming_task_commit_attach
+mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"});
 // reset_rl_progress
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"});
 mBvarInt64Adder 
g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"});
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 8fb5973249f..e72d0fcd376 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -251,6 +251,7 @@ extern BvarLatencyRecorderWithTag 
g_bvar_ms_get_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach;
+extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach;
 extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress;
 extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
 extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
@@ -470,6 +471,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter;
+extern mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter;
@@ -582,6 +584,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes;
+extern mBvarInt64Adder 
g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes;
diff --git a/cloud/src/meta-service/meta_service.h 
b/cloud/src/meta-service/meta_service.h
index 38cc77aebb0..c7a37277fdc 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -319,6 +319,11 @@ public:
                                    GetRLTaskCommitAttachResponse* response,
                                    ::google::protobuf::Closure* done) override;
 
+    void get_streaming_task_commit_attach(::google::protobuf::RpcController* 
controller,
+                                          const 
GetStreamingTaskCommitAttachRequest* request,
+                                          
GetStreamingTaskCommitAttachResponse* response,
+                                          ::google::protobuf::Closure* done) 
override;
+
     void reset_rl_progress(::google::protobuf::RpcController* controller,
                            const ResetRLProgressRequest* request, 
ResetRLProgressResponse* response,
                            ::google::protobuf::Closure* done) override;
@@ -820,6 +825,14 @@ public:
                   done);
     }
 
+    void get_streaming_task_commit_attach(::google::protobuf::RpcController* 
controller,
+                                          const 
GetStreamingTaskCommitAttachRequest* request,
+                                          
GetStreamingTaskCommitAttachResponse* response,
+                                          ::google::protobuf::Closure* done) 
override {
+        call_impl(&cloud::MetaService::get_streaming_task_commit_attach, 
controller, request,
+                  response, done);
+    }
+
     void reset_rl_progress(::google::protobuf::RpcController* controller,
                            const ResetRLProgressRequest* request, 
ResetRLProgressResponse* response,
                            ::google::protobuf::Closure* done) override {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index ce128f6d30e..bd4cf6fb036 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -610,6 +610,75 @@ void put_routine_load_progress(MetaServiceCode& code, 
std::string& msg,
               << " routine load new progress: " << 
new_progress_info.ShortDebugString();
 }
 
+void put_streaming_job_meta(MetaServiceCode& code, std::string& msg, const 
std::string& instance_id,
+                            const CommitTxnRequest* request, Transaction* txn, 
int64_t db_id) {
+    std::stringstream ss;
+    int64_t txn_id = request->txn_id();
+    if (!request->has_commit_attachment()) {
+        ss << "failed to get commit attachment from req, db_id=" << db_id << " 
txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+    TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment();
+    StreamingTaskCommitAttachmentPB commit_attachment =
+            txn_commit_attachment.streaming_task_txn_commit_attachment();
+    int64_t job_id = commit_attachment.job_id();
+
+    std::string streaming_meta_key;
+    std::string streaming_meta_val;
+    bool prev_meta_existed = true;
+    StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id, 
job_id};
+    streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
+    TxnErrorCode err = txn->get(streaming_meta_key, &streaming_meta_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+            prev_meta_existed = false;
+        } else {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get streaming job meta, db_id=" << db_id << " 
txn_id=" << txn_id
+               << " err=" << err;
+            msg = ss.str();
+            return;
+        }
+    }
+
+    StreamingTaskCommitAttachmentPB new_meta_info;
+    if (prev_meta_existed) {
+        if (!new_meta_info.ParseFromString(streaming_meta_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "failed to parse streaming job meta, db_id=" << db_id << " 
txn_id=" << txn_id;
+            msg = ss.str();
+            return;
+        }
+        new_meta_info.set_scanned_rows(new_meta_info.scanned_rows() +
+                                       commit_attachment.scanned_rows());
+        new_meta_info.set_load_bytes(new_meta_info.load_bytes() + 
commit_attachment.load_bytes());
+        new_meta_info.set_file_number(new_meta_info.file_number() +
+                                      commit_attachment.file_number());
+        new_meta_info.set_file_size(new_meta_info.file_size() + 
commit_attachment.file_size());
+    } else {
+        new_meta_info.set_job_id(commit_attachment.job_id());
+        new_meta_info.set_scanned_rows(commit_attachment.scanned_rows());
+        new_meta_info.set_load_bytes(commit_attachment.load_bytes());
+        new_meta_info.set_file_number(commit_attachment.file_number());
+        new_meta_info.set_file_size(commit_attachment.file_size());
+    }
+    if (commit_attachment.has_offset()) {
+        new_meta_info.set_offset(commit_attachment.offset());
+    }
+    std::string new_meta_val;
+    if (!new_meta_info.SerializeToString(&new_meta_val)) {
+        code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+        ss << "failed to serialize new streaming meta val, txn_id=" << txn_id;
+        msg = ss.str();
+        return;
+    }
+
+    txn->put(streaming_meta_key, new_meta_val);
+    LOG(INFO) << "put streaming_meta_key key=" << hex(streaming_meta_key)
+              << " streaming job new meta: " << 
new_meta_info.ShortDebugString();
+}
+
 void 
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* 
controller,
                                                 const 
GetRLTaskCommitAttachRequest* request,
                                                 GetRLTaskCommitAttachResponse* 
response,
@@ -678,6 +747,64 @@ void 
MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle
     }
 }
 
+void MetaServiceImpl::get_streaming_task_commit_attach(
+        ::google::protobuf::RpcController* controller,
+        const GetStreamingTaskCommitAttachRequest* request,
+        GetStreamingTaskCommitAttachResponse* response, 
::google::protobuf::Closure* done) {
+    RPC_PREPROCESS(get_streaming_task_commit_attach, get);
+    instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+    if (instance_id.empty()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty instance_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+    RPC_RATE_LIMIT(get_streaming_task_commit_attach)
+
+    TxnErrorCode err = txn_kv_->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::CREATE>(err);
+        ss << "filed to create txn, err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    if (!request->has_db_id() || !request->has_job_id()) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "empty db_id or job_id";
+        LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+        return;
+    }
+
+    int64_t db_id = request->db_id();
+    int64_t job_id = request->job_id();
+    std::string streaming_meta_key;
+    std::string streaming_meta_val;
+    StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id, 
job_id};
+    streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key);
+    err = txn->get(streaming_meta_key, &streaming_meta_val);
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND;
+        ss << "progress info not found, db_id=" << db_id << " job_id=" << 
job_id << " err=" << err;
+        msg = ss.str();
+        return;
+    } else if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get progress info, db_id=" << db_id << " job_id=" << 
job_id
+           << " err=" << err;
+        msg = ss.str();
+        return;
+    }
+
+    StreamingTaskCommitAttachmentPB* commit_attach = 
response->mutable_commit_attach();
+    if (!commit_attach->ParseFromString(streaming_meta_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        ss << "failed to parse meta info, db_id=" << db_id << " job_id=" << 
job_id;
+        msg = ss.str();
+        return;
+    }
+}
+
 void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* 
controller,
                                         const ResetRLProgressRequest* request,
                                         ResetRLProgressResponse* response,
@@ -1572,6 +1699,11 @@ void MetaServiceImpl::commit_txn_immediately(
             put_routine_load_progress(code, msg, instance_id, request, 
txn.get(), db_id);
         }
 
+        if (txn_info.load_job_source_type() ==
+            LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
+            put_streaming_job_meta(code, msg, instance_id, request, txn.get(), 
db_id);
+        }
+
         LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key)
                   << " txn_id=" << txn_id;
         LOG(INFO) << "commit_txn put_size=" << txn->put_bytes()
@@ -1965,6 +2097,11 @@ void MetaServiceImpl::commit_txn_eventually(
             put_routine_load_progress(code, msg, instance_id, request, 
txn.get(), db_id);
         }
 
+        if (txn_info.load_job_source_type() ==
+            LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) {
+            put_streaming_job_meta(code, msg, instance_id, request, txn.get(), 
db_id);
+        }
+
         // save versions for partition
         int64_t version_update_time_ms =
                 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp
index e23f84771ab..7b2b75c4d55 100644
--- a/cloud/src/meta-store/keys.cpp
+++ b/cloud/src/meta-store/keys.cpp
@@ -64,6 +64,7 @@ static const char* STATS_KEY_INFIX_TABLET               = 
"tablet";
 
 static const char* JOB_KEY_INFIX_TABLET                 = "tablet";
 static const char* JOB_KEY_INFIX_RL_PROGRESS            = 
"routine_load_progress";
+static const char* JOB_KEY_INFIX_STREAMING_JOB_META     = "streaming_job_meta";
 static const char* JOB_KEY_INFIX_RESTORE_TABLET         = "restore_tablet";
 static const char* JOB_KEY_INFIX_RESTORE_ROWSET         = "restore_rowset";
 
@@ -144,7 +145,7 @@ static void encode_prefix(const T& t, std::string* key) {
         MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, 
MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
         RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo, 
RecycleTxnKeyInfo, RecycleStageKeyInfo,
         StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo, 
JobRestoreRowsetKeyInfo,
-        JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
+        JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, 
StreamingJobMetaKeyInfo,
         CopyJobKeyInfo, CopyFileKeyInfo,  StorageVaultKeyInfo, 
MetaSchemaPBDictionaryInfo,
         MowTabletJobInfo>);
 
@@ -181,7 +182,8 @@ static void encode_prefix(const T& t, std::string* key) {
         encode_bytes(STATS_KEY_PREFIX, key);
     } else if constexpr (std::is_same_v<T, JobTabletKeyInfo>
                       || std::is_same_v<T, JobRecycleKeyInfo>
-                      || std::is_same_v<T, RLJobProgressKeyInfo>) {
+                      || std::is_same_v<T, RLJobProgressKeyInfo>
+                      || std::is_same_v<T, StreamingJobMetaKeyInfo>) {
         encode_bytes(JOB_KEY_PREFIX, key);
     } else if constexpr (std::is_same_v<T, CopyJobKeyInfo>
                       || std::is_same_v<T, CopyFileKeyInfo>) {
@@ -463,6 +465,13 @@ void rl_job_progress_key_info(const RLJobProgressKeyInfo& 
in, std::string* out)
     encode_int64(std::get<2>(in), out);           // job_id
 }
 
+void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in, 
std::string* out) {
+    encode_prefix(in, out);                              // 0x01 "job" 
${instance_id}
+    encode_bytes(JOB_KEY_INFIX_STREAMING_JOB_META, out); // 
"streaming_job_meta"
+    encode_int64(std::get<1>(in), out);                  // db_id
+    encode_int64(std::get<2>(in), out);                  // job_id
+}
+
 void job_restore_tablet_key(const JobRestoreTabletKeyInfo& in, std::string* 
out) {
     encode_prefix(in, out);                          // 0x01 "job" 
${instance_id}
     encode_bytes(JOB_KEY_INFIX_RESTORE_TABLET, out); // "restore_tablet"
diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h
index 8ccd974e0b7..3b9b234c574 100644
--- a/cloud/src/meta-store/keys.h
+++ b/cloud/src/meta-store/keys.h
@@ -218,6 +218,9 @@ using MetaPendingDeleteBitmapInfo = BasicKeyInfo<24 , 
std::tuple<std::string, in
 //                                                      0:instance_id 1:db_id  
2:job_id
 using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple<std::string, int64_t, 
int64_t>>;
 
+//                                                      0:instance_id 1:db_id  
2:job_id
+using StreamingJobMetaKeyInfo = BasicKeyInfo<52, std::tuple<std::string, 
int64_t, int64_t>>;
+
 //                                                      0:instance_id 
1:vault_id
 using StorageVaultKeyInfo = BasicKeyInfo<26, std::tuple<std::string, 
std::string>>;
 
@@ -407,6 +410,8 @@ void job_tablet_key(const JobTabletKeyInfo& in, 
std::string* out);
 static inline std::string job_tablet_key(const JobTabletKeyInfo& in) { 
std::string s; job_tablet_key(in, &s); return s; }
 void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* 
out);
 static inline std::string rl_job_progress_key_info(const RLJobProgressKeyInfo& 
in) { std::string s; rl_job_progress_key_info(in, &s); return s; }
+void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in, 
std::string* out);
+static inline std::string streaming_job_meta_key_info(const 
StreamingJobMetaKeyInfo& in) { std::string s; streaming_job_meta_key_info(in, 
&s); return s; }
 
 std::string copy_key_prefix(std::string_view instance_id);
 void copy_job_key(const CopyJobKeyInfo& in, std::string* out);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
index f17625a89ea..2b0673d6453 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java
@@ -492,4 +492,10 @@ public class MetaServiceClient {
         return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
                 .createInstance(request);
     }
+
+    public Cloud.GetStreamingTaskCommitAttachResponse
+            
getStreamingTaskCommitAttach(Cloud.GetStreamingTaskCommitAttachRequest request) 
{
+        return 
blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, 
TimeUnit.MILLISECONDS)
+                .getStreamingTaskCommitAttach(request);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index b351942cbe2..8710209ff8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -432,4 +432,9 @@ public class MetaServiceProxy {
     public Cloud.CreateInstanceResponse 
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
         return w.executeRequest((client) -> client.createInstance(request));
     }
+
+    public Cloud.GetStreamingTaskCommitAttachResponse 
getStreamingTaskCommitAttach(
+            Cloud.GetStreamingTaskCommitAttachRequest request) throws 
RpcException {
+        return w.executeRequest((client) -> 
client.getStreamingTaskCommitAttach(request));
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index fddb6ed7208..c3b9c321fa9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -85,6 +85,7 @@ import org.apache.doris.common.util.InternalDatabaseUtil;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.event.DataChangeEvent;
+import 
org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
 import org.apache.doris.load.loadv2.LoadJobFinalOperation;
 import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
 import org.apache.doris.metric.MetricRepo;
@@ -619,6 +620,19 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                 }
                 builder.setCommitAttachment(TxnUtil
                         
.rlTaskTxnCommitAttachmentToPb(rlTaskTxnCommitAttachment));
+            } else if (txnCommitAttachment instanceof 
StreamingTaskTxnCommitAttachment) {
+                StreamingTaskTxnCommitAttachment 
streamingTaskTxnCommitAttachment =
+                            (StreamingTaskTxnCommitAttachment) 
txnCommitAttachment;
+                TxnStateChangeCallback cb = 
callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getTaskId());
+                if (cb != null) {
+                    // use a temporary transaction state to do before commit 
check,
+                    // what actually works is the transactionId
+                    TransactionState tmpTxnState = new TransactionState();
+                    tmpTxnState.setTransactionId(transactionId);
+                    cb.beforeCommitted(tmpTxnState);
+                }
+                builder.setCommitAttachment(TxnUtil
+                        
.streamingTaskTxnCommitAttachmentToPb(streamingTaskTxnCommitAttachment));
             } else {
                 throw new UserException("invalid txnCommitAttachment");
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
index 3aca54cd150..4155e6c5e67 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.transaction;
 
 import org.apache.doris.cloud.proto.Cloud.RLTaskTxnCommitAttachmentPB;
 import org.apache.doris.cloud.proto.Cloud.RoutineLoadProgressPB;
+import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB;
 import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB;
 import 
org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB;
 import 
org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB.EtlStatusPB;
@@ -28,6 +29,7 @@ import org.apache.doris.cloud.proto.Cloud.TxnCoordinatorPB;
 import org.apache.doris.cloud.proto.Cloud.TxnInfoPB;
 import org.apache.doris.cloud.proto.Cloud.TxnSourceTypePB;
 import org.apache.doris.cloud.proto.Cloud.UniqueIdPB;
+import 
org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment;
 import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.load.loadv2.JobState;
@@ -268,6 +270,42 @@ public class TxnUtil {
         return new 
RLTaskTxnCommitAttachment(txnCommitAttachmentPB.getRlTaskTxnCommitAttachment());
     }
 
+    public static TxnCommitAttachmentPB 
streamingTaskTxnCommitAttachmentToPb(StreamingTaskTxnCommitAttachment
+            streamingTaskTxnCommitAttachment) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("streamingTaskTxnCommitAttachment:{}", 
streamingTaskTxnCommitAttachment);
+        }
+        TxnCommitAttachmentPB.Builder attachementBuilder = 
TxnCommitAttachmentPB.newBuilder();
+        
attachementBuilder.setType(TxnCommitAttachmentPB.Type.STREAMING_TASK_TXN_COMMIT_ATTACHMENT);
+
+        StreamingTaskCommitAttachmentPB.Builder builder =
+                StreamingTaskCommitAttachmentPB.newBuilder();
+
+        builder.setJobId(streamingTaskTxnCommitAttachment.getJobId())
+                .setTaskId(streamingTaskTxnCommitAttachment.getTaskId())
+                
.setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows())
+                .setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes())
+                
.setFileNumber(streamingTaskTxnCommitAttachment.getFileNumber())
+                .setFileSize(streamingTaskTxnCommitAttachment.getFileSize());
+
+        if (streamingTaskTxnCommitAttachment.getOffset() != null) {
+            
builder.setOffset(streamingTaskTxnCommitAttachment.getOffset().endOffset());
+        }
+
+        
attachementBuilder.setStreamingTaskTxnCommitAttachment(builder.build());
+        return attachementBuilder.build();
+    }
+
+    public static StreamingTaskTxnCommitAttachment 
streamingTaskTxnCommitAttachmentFromPb(
+            TxnCommitAttachmentPB txnCommitAttachmentPB) {
+        StreamingTaskCommitAttachmentPB streamingTaskCommitAttachmentPB =
+                txnCommitAttachmentPB.getStreamingTaskTxnCommitAttachment();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("StreamingTaskCommitAttachmentPB={}", 
streamingTaskCommitAttachmentPB);
+        }
+        return new 
StreamingTaskTxnCommitAttachment(streamingTaskCommitAttachmentPB);
+    }
+
     public static LoadJobFinalOperation 
loadJobFinalOperationFromPb(TxnCommitAttachmentPB txnCommitAttachmentPB) {
         LoadJobFinalOperationPB loadJobFinalOperationPB = 
txnCommitAttachmentPB.getLoadJobFinalOperation();
         if (LOG.isDebugEnabled()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index f2ce1ad5e02..b872040c4c3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -19,7 +19,10 @@ package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.UserException;
@@ -49,6 +52,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.rpc.RpcException;
 import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TRow;
 import org.apache.doris.transaction.TransactionException;
@@ -265,6 +269,17 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         offsetProvider.updateOffset(attachment.getOffset());
     }
 
+    @Override
+    public void onRegister() throws JobException {
+        
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this);
+    }
+
+    @Override
+    public void onReplayCreate() throws JobException {
+        onRegister();
+        super.onReplayCreate();
+    }
+
     @Override
     public ShowResultSetMetaData getTaskMetaData() {
         return InsertJob.TASK_META_DATA;
@@ -390,6 +405,35 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         updateJobStatisticAndOffset(attachment);
     }
 
+    public void replayOnCloudMode() throws UserException {
+        Cloud.GetStreamingTaskCommitAttachRequest.Builder builder =
+                Cloud.GetStreamingTaskCommitAttachRequest.newBuilder();
+        builder.setCloudUniqueId(Config.cloud_unique_id);
+        builder.setDbId(dbId);
+        builder.setJobId(getJobId());
+
+        Cloud.GetStreamingTaskCommitAttachResponse response;
+        try {
+            response = 
MetaServiceProxy.getInstance().getStreamingTaskCommitAttach(builder.build());
+            if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+                log.warn("failed to get streaming task commit attach, 
response: {}", response);
+                if (response.getStatus().getCode() == 
Cloud.MetaServiceCode.STREAMING_JOB_PROGRESS_NOT_FOUND) {
+                    log.warn("not found streaming job progress, response: {}", 
response);
+                    return;
+                } else {
+                    throw new UserException(response.getStatus().getMsg());
+                }
+            }
+        } catch (RpcException e) {
+            log.info("failed to get streaming task commit attach {}", e);
+            throw new UserException(e.getMessage());
+        }
+
+        StreamingTaskTxnCommitAttachment commitAttach =
+                new 
StreamingTaskTxnCommitAttachment(response.getCommitAttach());
+        updateJobStatisticAndOffset(commitAttach);
+    }
+
     @Override
     public void afterAborted(TransactionState txnState, boolean txnOperated, 
String txnStatusChangeReason)
             throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
index 744f83080aa..8660ed94739 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.job.extensions.insert.streaming;
 
+import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TxnCommitAttachment;
@@ -38,6 +39,15 @@ public class StreamingTaskTxnCommitAttachment extends 
TxnCommitAttachment {
         this.offset = offset;
     }
 
+    public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB 
pb) {
+        super(TransactionState.LoadJobSourceType.STREAMING_JOB);
+        this.scannedRows = pb.getScannedRows();
+        this.loadBytes = pb.getLoadBytes();
+        this.fileNumber = pb.getFileNumber();
+        this.fileSize = pb.getFileSize();
+        this.offset.setEndOffset(pb.getOffset());
+    }
+
     @Getter
     private long jobId;
     @Getter
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
index 095f0a5e6bf..a3b0689bfc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java
@@ -19,4 +19,8 @@ package org.apache.doris.job.offset;
 
 public interface Offset {
     String toJson();
+
+    void setEndOffset(String endOffset);
+
+    String endOffset();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index f76707b2453..2ab2030fbbb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -30,6 +30,16 @@ public class S3Offset implements Offset {
     String endFile;
     String fileLists;
 
+    @Override
+    public void setEndOffset(String endOffset) {
+        this.endFile = endOffset;
+    }
+
+    @Override
+    public String endOffset() {
+        return endFile;
+    }
+
     @Override
     public String toJson() {
         return GsonUtils.GSON.toJson(this);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index e277c24f8d7..aa6bcd28359 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -322,6 +322,7 @@ enum LoadJobSourceTypePB {
     LOAD_JOB_SRC_TYPE_INSERT_STREAMING  = 3; // insert stmt (streaming type), 
update stmt use this type
     LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK = 4; // routine load task use this type
     LOAD_JOB_SRC_TYPE_BATCH_LOAD_JOB    = 5; // load job v2 for broker load
+    LOAD_JOB_SRC_TYPE_STREAMING_JOB     = 6; // streaming job use this type
 }
 
 enum TxnStatusPB {
@@ -365,10 +366,21 @@ message RoutineLoadJobStatisticPB {
     optional int64 task_execution_time_ms = 5;
 }
 
+message StreamingTaskCommitAttachmentPB {
+    optional int64 job_id = 1;
+    optional int64 task_id = 2;
+    optional string offset = 3;
+    optional int64 scanned_rows = 4;
+    optional int64 load_bytes = 5;
+    optional int64 file_number = 6;
+    optional int64 file_size = 7;
+}
+
 message TxnCommitAttachmentPB {
     enum Type {
         LODD_JOB_FINAL_OPERATION = 0;
         RT_TASK_TXN_COMMIT_ATTACHMENT = 1;
+        STREAMING_TASK_TXN_COMMIT_ATTACHMENT = 2;
     }
     message LoadJobFinalOperationPB {
         message EtlStatusPB {
@@ -425,6 +437,7 @@ message TxnCommitAttachmentPB {
     optional Type type = 1;
     optional LoadJobFinalOperationPB load_job_final_operation = 2;
     optional RLTaskTxnCommitAttachmentPB rl_task_txn_commit_attachment = 3;
+    optional StreamingTaskCommitAttachmentPB 
streaming_task_txn_commit_attachment = 4;
 }
 
 // For storing label -> txn_ids
@@ -1698,7 +1711,8 @@ enum MetaServiceCode {
     JOB_ALREADY_SUCCESS = 5002;
     ROUTINE_LOAD_DATA_INCONSISTENT = 5003;
     ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004;
-    JOB_CHECK_ALTER_VERSION = 5005;
+    STREAMING_JOB_PROGRESS_NOT_FOUND = 5005;
+    JOB_CHECK_ALTER_VERSION = 5006;
 
     // Rate limit
     MAX_QPS_LIMIT = 6001;
@@ -1876,6 +1890,18 @@ message ResetRLProgressResponse {
     optional MetaServiceResponseStatus status = 1;
 }
 
+message GetStreamingTaskCommitAttachRequest {
+    optional string cloud_unique_id = 1; // For auth
+    optional int64 db_id = 2;
+    optional int64 job_id = 3;
+    optional string request_ip = 4;
+}
+
+message GetStreamingTaskCommitAttachResponse {
+    optional MetaServiceResponseStatus status = 1;
+    optional StreamingTaskCommitAttachmentPB commit_attach = 2;
+}
+
 message CheckKeyInfos {
     repeated int64 db_ids = 1;
     repeated int64 table_ids = 2;
@@ -2069,6 +2095,9 @@ service MetaService {
     rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns 
(GetRLTaskCommitAttachResponse);
     rpc reset_rl_progress(ResetRLProgressRequest) returns 
(ResetRLProgressResponse);
 
+    // streaming job meta
+    rpc get_streaming_task_commit_attach(GetStreamingTaskCommitAttachRequest) 
returns (GetStreamingTaskCommitAttachResponse);
+
     // check KV
     rpc check_kv(CheckKVRequest) returns (CheckKVResponse);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to