This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch streaming-job-dev in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/streaming-job-dev by this push: new 23c6fd771be Revert "[revert](streaming job) revert "implement offset persistence and replay in cloud mode"" (#56156) 23c6fd771be is described below commit 23c6fd771be575aa3cb1138f352387c8d32d0ca4 Author: wudi <w...@selectdb.com> AuthorDate: Wed Sep 17 17:27:32 2025 +0800 Revert "[revert](streaming job) revert "implement offset persistence and replay in cloud mode"" (#56156) Reverts apache/doris#56149 --- cloud/src/common/bvars.cpp | 5 + cloud/src/common/bvars.h | 3 + cloud/src/meta-service/meta_service.h | 13 ++ cloud/src/meta-service/meta_service_txn.cpp | 137 +++++++++++++++++++++ cloud/src/meta-store/keys.cpp | 13 +- cloud/src/meta-store/keys.h | 5 + .../apache/doris/cloud/rpc/MetaServiceClient.java | 6 + .../apache/doris/cloud/rpc/MetaServiceProxy.java | 5 + .../transaction/CloudGlobalTransactionMgr.java | 14 +++ .../apache/doris/cloud/transaction/TxnUtil.java | 38 ++++++ .../insert/streaming/StreamingInsertJob.java | 44 +++++++ .../StreamingTaskTxnCommitAttachment.java | 10 ++ .../java/org/apache/doris/job/offset/Offset.java | 4 + .../org/apache/doris/job/offset/s3/S3Offset.java | 10 ++ gensrc/proto/cloud.proto | 31 ++++- 15 files changed, 335 insertions(+), 3 deletions(-) diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp index e495b5ea95b..8ea24c4a1ba 100644 --- a/cloud/src/common/bvars.cpp +++ b/cloud/src/common/bvars.cpp @@ -85,6 +85,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap("ms", "remove_delete_b BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock("ms", "remove_delete_bitmap_update_lock"); BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance"); BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms", "get_rl_task_commit_attach"); +BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach("ms", "get_streaming_task_commit_attach"); BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms", "reset_rl_progress"); BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id"); BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms", "start_tablet_job"); @@ -364,6 +365,8 @@ mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_counter("rpc_kv_precommit_txn_ge mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter("rpc_kv_precommit_txn_put_counter",{"instance_id"}); // get_rl_task_commit_attach mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter("rpc_kv_get_rl_task_commit_attach_get_counter",{"instance_id"}); +// get_streaming_task_commit_attach +mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter("rpc_kv_get_streaming_task_commit_attach_get_counter",{"instance_id"}); // reset_rl_progress mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter("rpc_kv_reset_rl_progress_get_counter",{"instance_id"}); mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter("rpc_kv_reset_rl_progress_put_counter",{"instance_id"}); @@ -527,6 +530,8 @@ mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_bytes("rpc_kv_precommit_txn_get_ mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes("rpc_kv_precommit_txn_put_bytes",{"instance_id"}); // get_rl_task_commit_attach mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes("rpc_kv_get_rl_task_commit_attach_get_bytes",{"instance_id"}); +// get_streaming_task_commit_attach +mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes("rpc_kv_get_streaming_task_commit_attach_get_bytes",{"instance_id"}); // reset_rl_progress mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes("rpc_kv_reset_rl_progress_get_bytes",{"instance_id"}); mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes("rpc_kv_reset_rl_progress_put_bytes",{"instance_id"}); diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h index 8fb5973249f..e72d0fcd376 100644 --- a/cloud/src/common/bvars.h +++ b/cloud/src/common/bvars.h @@ -251,6 +251,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status; extern BvarLatencyRecorderWithTag g_bvar_ms_get_instance; extern BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach; +extern BvarLatencyRecorderWithTag g_bvar_ms_get_streaming_task_commit_attach; extern BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress; extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id; extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv; @@ -470,6 +471,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_counter; extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_counter; extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_counter; extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_counter; +extern mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_counter; extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_counter; extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_counter; extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_counter; @@ -582,6 +584,7 @@ extern mBvarInt64Adder g_bvar_rpc_kv_begin_txn_put_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_get_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_precommit_txn_put_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_get_rl_task_commit_attach_get_bytes; +extern mBvarInt64Adder g_bvar_rpc_kv_get_streaming_task_commit_attach_get_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_get_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_put_bytes; extern mBvarInt64Adder g_bvar_rpc_kv_reset_rl_progress_del_bytes; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 38cc77aebb0..c7a37277fdc 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -319,6 +319,11 @@ public: GetRLTaskCommitAttachResponse* response, ::google::protobuf::Closure* done) override; + void get_streaming_task_commit_attach(::google::protobuf::RpcController* controller, + const GetStreamingTaskCommitAttachRequest* request, + GetStreamingTaskCommitAttachResponse* response, + ::google::protobuf::Closure* done) override; + void reset_rl_progress(::google::protobuf::RpcController* controller, const ResetRLProgressRequest* request, ResetRLProgressResponse* response, ::google::protobuf::Closure* done) override; @@ -820,6 +825,14 @@ public: done); } + void get_streaming_task_commit_attach(::google::protobuf::RpcController* controller, + const GetStreamingTaskCommitAttachRequest* request, + GetStreamingTaskCommitAttachResponse* response, + ::google::protobuf::Closure* done) override { + call_impl(&cloud::MetaService::get_streaming_task_commit_attach, controller, request, + response, done); + } + void reset_rl_progress(::google::protobuf::RpcController* controller, const ResetRLProgressRequest* request, ResetRLProgressResponse* response, ::google::protobuf::Closure* done) override { diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index ce128f6d30e..bd4cf6fb036 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -610,6 +610,75 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg, << " routine load new progress: " << new_progress_info.ShortDebugString(); } +void put_streaming_job_meta(MetaServiceCode& code, std::string& msg, const std::string& instance_id, + const CommitTxnRequest* request, Transaction* txn, int64_t db_id) { + std::stringstream ss; + int64_t txn_id = request->txn_id(); + if (!request->has_commit_attachment()) { + ss << "failed to get commit attachment from req, db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + return; + } + TxnCommitAttachmentPB txn_commit_attachment = request->commit_attachment(); + StreamingTaskCommitAttachmentPB commit_attachment = + txn_commit_attachment.streaming_task_txn_commit_attachment(); + int64_t job_id = commit_attachment.job_id(); + + std::string streaming_meta_key; + std::string streaming_meta_val; + bool prev_meta_existed = true; + StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id, job_id}; + streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key); + TxnErrorCode err = txn->get(streaming_meta_key, &streaming_meta_val); + if (err != TxnErrorCode::TXN_OK) { + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + prev_meta_existed = false; + } else { + code = cast_as<ErrCategory::READ>(err); + ss << "failed to get streaming job meta, db_id=" << db_id << " txn_id=" << txn_id + << " err=" << err; + msg = ss.str(); + return; + } + } + + StreamingTaskCommitAttachmentPB new_meta_info; + if (prev_meta_existed) { + if (!new_meta_info.ParseFromString(streaming_meta_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse streaming job meta, db_id=" << db_id << " txn_id=" << txn_id; + msg = ss.str(); + return; + } + new_meta_info.set_scanned_rows(new_meta_info.scanned_rows() + + commit_attachment.scanned_rows()); + new_meta_info.set_load_bytes(new_meta_info.load_bytes() + commit_attachment.load_bytes()); + new_meta_info.set_file_number(new_meta_info.file_number() + + commit_attachment.file_number()); + new_meta_info.set_file_size(new_meta_info.file_size() + commit_attachment.file_size()); + } else { + new_meta_info.set_job_id(commit_attachment.job_id()); + new_meta_info.set_scanned_rows(commit_attachment.scanned_rows()); + new_meta_info.set_load_bytes(commit_attachment.load_bytes()); + new_meta_info.set_file_number(commit_attachment.file_number()); + new_meta_info.set_file_size(commit_attachment.file_size()); + } + if (commit_attachment.has_offset()) { + new_meta_info.set_offset(commit_attachment.offset()); + } + std::string new_meta_val; + if (!new_meta_info.SerializeToString(&new_meta_val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + ss << "failed to serialize new streaming meta val, txn_id=" << txn_id; + msg = ss.str(); + return; + } + + txn->put(streaming_meta_key, new_meta_val); + LOG(INFO) << "put streaming_meta_key key=" << hex(streaming_meta_key) + << " streaming job new meta: " << new_meta_info.ShortDebugString(); +} + void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller, const GetRLTaskCommitAttachRequest* request, GetRLTaskCommitAttachResponse* response, @@ -678,6 +747,64 @@ void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcControlle } } +void MetaServiceImpl::get_streaming_task_commit_attach( + ::google::protobuf::RpcController* controller, + const GetStreamingTaskCommitAttachRequest* request, + GetStreamingTaskCommitAttachResponse* response, ::google::protobuf::Closure* done) { + RPC_PREPROCESS(get_streaming_task_commit_attach, get); + instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); + if (instance_id.empty()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty instance_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + RPC_RATE_LIMIT(get_streaming_task_commit_attach) + + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + ss << "filed to create txn, err=" << err; + msg = ss.str(); + return; + } + + if (!request->has_db_id() || !request->has_job_id()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "empty db_id or job_id"; + LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id(); + return; + } + + int64_t db_id = request->db_id(); + int64_t job_id = request->job_id(); + std::string streaming_meta_key; + std::string streaming_meta_val; + StreamingJobMetaKeyInfo streaming_meta_key_info {instance_id, db_id, job_id}; + streaming_job_meta_key_info(streaming_meta_key_info, &streaming_meta_key); + err = txn->get(streaming_meta_key, &streaming_meta_val); + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = MetaServiceCode::STREAMING_JOB_PROGRESS_NOT_FOUND; + ss << "progress info not found, db_id=" << db_id << " job_id=" << job_id << " err=" << err; + msg = ss.str(); + return; + } else if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "failed to get progress info, db_id=" << db_id << " job_id=" << job_id + << " err=" << err; + msg = ss.str(); + return; + } + + StreamingTaskCommitAttachmentPB* commit_attach = response->mutable_commit_attach(); + if (!commit_attach->ParseFromString(streaming_meta_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse meta info, db_id=" << db_id << " job_id=" << job_id; + msg = ss.str(); + return; + } +} + void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* controller, const ResetRLProgressRequest* request, ResetRLProgressResponse* response, @@ -1572,6 +1699,11 @@ void MetaServiceImpl::commit_txn_immediately( put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id); } + if (txn_info.load_job_source_type() == + LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) { + put_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id); + } + LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << " txn_id=" << txn_id; LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() @@ -1965,6 +2097,11 @@ void MetaServiceImpl::commit_txn_eventually( put_routine_load_progress(code, msg, instance_id, request, txn.get(), db_id); } + if (txn_info.load_job_source_type() == + LoadJobSourceTypePB::LOAD_JOB_SRC_TYPE_STREAMING_JOB) { + put_streaming_job_meta(code, msg, instance_id, request, txn.get(), db_id); + } + // save versions for partition int64_t version_update_time_ms = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp index e23f84771ab..7b2b75c4d55 100644 --- a/cloud/src/meta-store/keys.cpp +++ b/cloud/src/meta-store/keys.cpp @@ -64,6 +64,7 @@ static const char* STATS_KEY_INFIX_TABLET = "tablet"; static const char* JOB_KEY_INFIX_TABLET = "tablet"; static const char* JOB_KEY_INFIX_RL_PROGRESS = "routine_load_progress"; +static const char* JOB_KEY_INFIX_STREAMING_JOB_META = "streaming_job_meta"; static const char* JOB_KEY_INFIX_RESTORE_TABLET = "restore_tablet"; static const char* JOB_KEY_INFIX_RESTORE_ROWSET = "restore_rowset"; @@ -144,7 +145,7 @@ static void encode_prefix(const T& t, std::string* key) { MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo, RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo, RecycleTxnKeyInfo, RecycleStageKeyInfo, StatsTabletKeyInfo, TableVersionKeyInfo, JobRestoreTabletKeyInfo, JobRestoreRowsetKeyInfo, - JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, + JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, StreamingJobMetaKeyInfo, CopyJobKeyInfo, CopyFileKeyInfo, StorageVaultKeyInfo, MetaSchemaPBDictionaryInfo, MowTabletJobInfo>); @@ -181,7 +182,8 @@ static void encode_prefix(const T& t, std::string* key) { encode_bytes(STATS_KEY_PREFIX, key); } else if constexpr (std::is_same_v<T, JobTabletKeyInfo> || std::is_same_v<T, JobRecycleKeyInfo> - || std::is_same_v<T, RLJobProgressKeyInfo>) { + || std::is_same_v<T, RLJobProgressKeyInfo> + || std::is_same_v<T, StreamingJobMetaKeyInfo>) { encode_bytes(JOB_KEY_PREFIX, key); } else if constexpr (std::is_same_v<T, CopyJobKeyInfo> || std::is_same_v<T, CopyFileKeyInfo>) { @@ -463,6 +465,13 @@ void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* out) encode_int64(std::get<2>(in), out); // job_id } +void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in, std::string* out) { + encode_prefix(in, out); // 0x01 "job" ${instance_id} + encode_bytes(JOB_KEY_INFIX_STREAMING_JOB_META, out); // "streaming_job_meta" + encode_int64(std::get<1>(in), out); // db_id + encode_int64(std::get<2>(in), out); // job_id +} + void job_restore_tablet_key(const JobRestoreTabletKeyInfo& in, std::string* out) { encode_prefix(in, out); // 0x01 "job" ${instance_id} encode_bytes(JOB_KEY_INFIX_RESTORE_TABLET, out); // "restore_tablet" diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h index 8ccd974e0b7..3b9b234c574 100644 --- a/cloud/src/meta-store/keys.h +++ b/cloud/src/meta-store/keys.h @@ -218,6 +218,9 @@ using MetaPendingDeleteBitmapInfo = BasicKeyInfo<24 , std::tuple<std::string, in // 0:instance_id 1:db_id 2:job_id using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple<std::string, int64_t, int64_t>>; +// 0:instance_id 1:db_id 2:job_id +using StreamingJobMetaKeyInfo = BasicKeyInfo<52, std::tuple<std::string, int64_t, int64_t>>; + // 0:instance_id 1:vault_id using StorageVaultKeyInfo = BasicKeyInfo<26, std::tuple<std::string, std::string>>; @@ -407,6 +410,8 @@ void job_tablet_key(const JobTabletKeyInfo& in, std::string* out); static inline std::string job_tablet_key(const JobTabletKeyInfo& in) { std::string s; job_tablet_key(in, &s); return s; } void rl_job_progress_key_info(const RLJobProgressKeyInfo& in, std::string* out); static inline std::string rl_job_progress_key_info(const RLJobProgressKeyInfo& in) { std::string s; rl_job_progress_key_info(in, &s); return s; } +void streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in, std::string* out); +static inline std::string streaming_job_meta_key_info(const StreamingJobMetaKeyInfo& in) { std::string s; streaming_job_meta_key_info(in, &s); return s; } std::string copy_key_prefix(std::string_view instance_id); void copy_job_key(const CopyJobKeyInfo& in, std::string* out); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java index f17625a89ea..2b0673d6453 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceClient.java @@ -492,4 +492,10 @@ public class MetaServiceClient { return blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, TimeUnit.MILLISECONDS) .createInstance(request); } + + public Cloud.GetStreamingTaskCommitAttachResponse + getStreamingTaskCommitAttach(Cloud.GetStreamingTaskCommitAttachRequest request) { + return blockingStub.withDeadlineAfter(Config.meta_service_brpc_timeout_ms, TimeUnit.MILLISECONDS) + .getStreamingTaskCommitAttach(request); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java index b351942cbe2..8710209ff8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java @@ -432,4 +432,9 @@ public class MetaServiceProxy { public Cloud.CreateInstanceResponse createInstance(Cloud.CreateInstanceRequest request) throws RpcException { return w.executeRequest((client) -> client.createInstance(request)); } + + public Cloud.GetStreamingTaskCommitAttachResponse getStreamingTaskCommitAttach( + Cloud.GetStreamingTaskCommitAttachRequest request) throws RpcException { + return w.executeRequest((client) -> client.getStreamingTaskCommitAttach(request)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index fddb6ed7208..c3b9c321fa9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -85,6 +85,7 @@ import org.apache.doris.common.util.InternalDatabaseUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.event.DataChangeEvent; +import org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; import org.apache.doris.metric.MetricRepo; @@ -619,6 +620,19 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } builder.setCommitAttachment(TxnUtil .rlTaskTxnCommitAttachmentToPb(rlTaskTxnCommitAttachment)); + } else if (txnCommitAttachment instanceof StreamingTaskTxnCommitAttachment) { + StreamingTaskTxnCommitAttachment streamingTaskTxnCommitAttachment = + (StreamingTaskTxnCommitAttachment) txnCommitAttachment; + TxnStateChangeCallback cb = callbackFactory.getCallback(streamingTaskTxnCommitAttachment.getTaskId()); + if (cb != null) { + // use a temporary transaction state to do before commit check, + // what actually works is the transactionId + TransactionState tmpTxnState = new TransactionState(); + tmpTxnState.setTransactionId(transactionId); + cb.beforeCommitted(tmpTxnState); + } + builder.setCommitAttachment(TxnUtil + .streamingTaskTxnCommitAttachmentToPb(streamingTaskTxnCommitAttachment)); } else { throw new UserException("invalid txnCommitAttachment"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java index 3aca54cd150..4155e6c5e67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java @@ -19,6 +19,7 @@ package org.apache.doris.cloud.transaction; import org.apache.doris.cloud.proto.Cloud.RLTaskTxnCommitAttachmentPB; import org.apache.doris.cloud.proto.Cloud.RoutineLoadProgressPB; +import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB; import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB; import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB; import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB.LoadJobFinalOperationPB.EtlStatusPB; @@ -28,6 +29,7 @@ import org.apache.doris.cloud.proto.Cloud.TxnCoordinatorPB; import org.apache.doris.cloud.proto.Cloud.TxnInfoPB; import org.apache.doris.cloud.proto.Cloud.TxnSourceTypePB; import org.apache.doris.cloud.proto.Cloud.UniqueIdPB; +import org.apache.doris.job.extensions.insert.streaming.StreamingTaskTxnCommitAttachment; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.load.loadv2.JobState; @@ -268,6 +270,42 @@ public class TxnUtil { return new RLTaskTxnCommitAttachment(txnCommitAttachmentPB.getRlTaskTxnCommitAttachment()); } + public static TxnCommitAttachmentPB streamingTaskTxnCommitAttachmentToPb(StreamingTaskTxnCommitAttachment + streamingTaskTxnCommitAttachment) { + if (LOG.isDebugEnabled()) { + LOG.debug("streamingTaskTxnCommitAttachment:{}", streamingTaskTxnCommitAttachment); + } + TxnCommitAttachmentPB.Builder attachementBuilder = TxnCommitAttachmentPB.newBuilder(); + attachementBuilder.setType(TxnCommitAttachmentPB.Type.STREAMING_TASK_TXN_COMMIT_ATTACHMENT); + + StreamingTaskCommitAttachmentPB.Builder builder = + StreamingTaskCommitAttachmentPB.newBuilder(); + + builder.setJobId(streamingTaskTxnCommitAttachment.getJobId()) + .setTaskId(streamingTaskTxnCommitAttachment.getTaskId()) + .setScannedRows(streamingTaskTxnCommitAttachment.getScannedRows()) + .setLoadBytes(streamingTaskTxnCommitAttachment.getLoadBytes()) + .setFileNumber(streamingTaskTxnCommitAttachment.getFileNumber()) + .setFileSize(streamingTaskTxnCommitAttachment.getFileSize()); + + if (streamingTaskTxnCommitAttachment.getOffset() != null) { + builder.setOffset(streamingTaskTxnCommitAttachment.getOffset().endOffset()); + } + + attachementBuilder.setStreamingTaskTxnCommitAttachment(builder.build()); + return attachementBuilder.build(); + } + + public static StreamingTaskTxnCommitAttachment streamingTaskTxnCommitAttachmentFromPb( + TxnCommitAttachmentPB txnCommitAttachmentPB) { + StreamingTaskCommitAttachmentPB streamingTaskCommitAttachmentPB = + txnCommitAttachmentPB.getStreamingTaskTxnCommitAttachment(); + if (LOG.isDebugEnabled()) { + LOG.debug("StreamingTaskCommitAttachmentPB={}", streamingTaskCommitAttachmentPB); + } + return new StreamingTaskTxnCommitAttachment(streamingTaskCommitAttachmentPB); + } + public static LoadJobFinalOperation loadJobFinalOperationFromPb(TxnCommitAttachmentPB txnCommitAttachmentPB) { LoadJobFinalOperationPB loadJobFinalOperationPB = txnCommitAttachmentPB.getLoadJobFinalOperation(); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index f2ce1ad5e02..b872040c4c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -19,7 +19,10 @@ package org.apache.doris.job.extensions.insert.streaming; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.MetaServiceProxy; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.UserException; @@ -49,6 +52,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.rpc.RpcException; import org.apache.doris.thrift.TCell; import org.apache.doris.thrift.TRow; import org.apache.doris.transaction.TransactionException; @@ -265,6 +269,17 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M offsetProvider.updateOffset(attachment.getOffset()); } + @Override + public void onRegister() throws JobException { + Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(this); + } + + @Override + public void onReplayCreate() throws JobException { + onRegister(); + super.onReplayCreate(); + } + @Override public ShowResultSetMetaData getTaskMetaData() { return InsertJob.TASK_META_DATA; @@ -390,6 +405,35 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M updateJobStatisticAndOffset(attachment); } + public void replayOnCloudMode() throws UserException { + Cloud.GetStreamingTaskCommitAttachRequest.Builder builder = + Cloud.GetStreamingTaskCommitAttachRequest.newBuilder(); + builder.setCloudUniqueId(Config.cloud_unique_id); + builder.setDbId(dbId); + builder.setJobId(getJobId()); + + Cloud.GetStreamingTaskCommitAttachResponse response; + try { + response = MetaServiceProxy.getInstance().getStreamingTaskCommitAttach(builder.build()); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + log.warn("failed to get streaming task commit attach, response: {}", response); + if (response.getStatus().getCode() == Cloud.MetaServiceCode.STREAMING_JOB_PROGRESS_NOT_FOUND) { + log.warn("not found streaming job progress, response: {}", response); + return; + } else { + throw new UserException(response.getStatus().getMsg()); + } + } + } catch (RpcException e) { + log.info("failed to get streaming task commit attach {}", e); + throw new UserException(e.getMessage()); + } + + StreamingTaskTxnCommitAttachment commitAttach = + new StreamingTaskTxnCommitAttachment(response.getCommitAttach()); + updateJobStatisticAndOffset(commitAttach); + } + @Override public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason) throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java index 744f83080aa..8660ed94739 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingTaskTxnCommitAttachment.java @@ -17,6 +17,7 @@ package org.apache.doris.job.extensions.insert.streaming; +import org.apache.doris.cloud.proto.Cloud.StreamingTaskCommitAttachmentPB; import org.apache.doris.job.offset.Offset; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TxnCommitAttachment; @@ -38,6 +39,15 @@ public class StreamingTaskTxnCommitAttachment extends TxnCommitAttachment { this.offset = offset; } + public StreamingTaskTxnCommitAttachment(StreamingTaskCommitAttachmentPB pb) { + super(TransactionState.LoadJobSourceType.STREAMING_JOB); + this.scannedRows = pb.getScannedRows(); + this.loadBytes = pb.getLoadBytes(); + this.fileNumber = pb.getFileNumber(); + this.fileSize = pb.getFileSize(); + this.offset.setEndOffset(pb.getOffset()); + } + @Getter private long jobId; @Getter diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java index 095f0a5e6bf..a3b0689bfc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/Offset.java @@ -19,4 +19,8 @@ package org.apache.doris.job.offset; public interface Offset { String toJson(); + + void setEndOffset(String endOffset); + + String endOffset(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java index f76707b2453..2ab2030fbbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java @@ -30,6 +30,16 @@ public class S3Offset implements Offset { String endFile; String fileLists; + @Override + public void setEndOffset(String endOffset) { + this.endFile = endOffset; + } + + @Override + public String endOffset() { + return endFile; + } + @Override public String toJson() { return GsonUtils.GSON.toJson(this); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index e277c24f8d7..aa6bcd28359 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -322,6 +322,7 @@ enum LoadJobSourceTypePB { LOAD_JOB_SRC_TYPE_INSERT_STREAMING = 3; // insert stmt (streaming type), update stmt use this type LOAD_JOB_SRC_TYPE_ROUTINE_LOAD_TASK = 4; // routine load task use this type LOAD_JOB_SRC_TYPE_BATCH_LOAD_JOB = 5; // load job v2 for broker load + LOAD_JOB_SRC_TYPE_STREAMING_JOB = 6; // streaming job use this type } enum TxnStatusPB { @@ -365,10 +366,21 @@ message RoutineLoadJobStatisticPB { optional int64 task_execution_time_ms = 5; } +message StreamingTaskCommitAttachmentPB { + optional int64 job_id = 1; + optional int64 task_id = 2; + optional string offset = 3; + optional int64 scanned_rows = 4; + optional int64 load_bytes = 5; + optional int64 file_number = 6; + optional int64 file_size = 7; +} + message TxnCommitAttachmentPB { enum Type { LODD_JOB_FINAL_OPERATION = 0; RT_TASK_TXN_COMMIT_ATTACHMENT = 1; + STREAMING_TASK_TXN_COMMIT_ATTACHMENT = 2; } message LoadJobFinalOperationPB { message EtlStatusPB { @@ -425,6 +437,7 @@ message TxnCommitAttachmentPB { optional Type type = 1; optional LoadJobFinalOperationPB load_job_final_operation = 2; optional RLTaskTxnCommitAttachmentPB rl_task_txn_commit_attachment = 3; + optional StreamingTaskCommitAttachmentPB streaming_task_txn_commit_attachment = 4; } // For storing label -> txn_ids @@ -1698,7 +1711,8 @@ enum MetaServiceCode { JOB_ALREADY_SUCCESS = 5002; ROUTINE_LOAD_DATA_INCONSISTENT = 5003; ROUTINE_LOAD_PROGRESS_NOT_FOUND = 5004; - JOB_CHECK_ALTER_VERSION = 5005; + STREAMING_JOB_PROGRESS_NOT_FOUND = 5005; + JOB_CHECK_ALTER_VERSION = 5006; // Rate limit MAX_QPS_LIMIT = 6001; @@ -1876,6 +1890,18 @@ message ResetRLProgressResponse { optional MetaServiceResponseStatus status = 1; } +message GetStreamingTaskCommitAttachRequest { + optional string cloud_unique_id = 1; // For auth + optional int64 db_id = 2; + optional int64 job_id = 3; + optional string request_ip = 4; +} + +message GetStreamingTaskCommitAttachResponse { + optional MetaServiceResponseStatus status = 1; + optional StreamingTaskCommitAttachmentPB commit_attach = 2; +} + message CheckKeyInfos { repeated int64 db_ids = 1; repeated int64 table_ids = 2; @@ -2069,6 +2095,9 @@ service MetaService { rpc get_rl_task_commit_attach(GetRLTaskCommitAttachRequest) returns (GetRLTaskCommitAttachResponse); rpc reset_rl_progress(ResetRLProgressRequest) returns (ResetRLProgressResponse); + // streaming job meta + rpc get_streaming_task_commit_attach(GetStreamingTaskCommitAttachRequest) returns (GetStreamingTaskCommitAttachResponse); + // check KV rpc check_kv(CheckKVRequest) returns (CheckKVResponse); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org