This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 32b36d3c9c7 [refactor](move-memtable) rename proto OpenStreamSink to OpenLoadStream (#26527) 32b36d3c9c7 is described below commit 32b36d3c9c73a34fb2308b879fdb62e9f054cad6 Author: Kaijie Chen <c...@apache.org> AuthorDate: Tue Nov 7 22:41:20 2023 +0800 [refactor](move-memtable) rename proto OpenStreamSink to OpenLoadStream (#26527) --- be/src/runtime/load_stream.cpp | 2 +- be/src/runtime/load_stream.h | 2 +- be/src/runtime/load_stream_mgr.cpp | 2 +- be/src/runtime/load_stream_mgr.h | 2 +- be/src/service/internal_service.cpp | 4 ++-- be/src/service/internal_service.h | 2 +- be/src/vec/sink/load_stream_stub.cpp | 4 ++-- be/test/runtime/load_stream_test.cpp | 8 ++++---- gensrc/proto/internal_service.proto | 6 +++--- 9 files changed, 16 insertions(+), 16 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index daca91e978c..9d1722c1726 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -242,7 +242,7 @@ LoadStream::~LoadStream() { LOG(INFO) << "load stream is deconstructed " << *this; } -Status LoadStream::init(const POpenStreamSinkRequest* request) { +Status LoadStream::init(const POpenLoadStreamRequest* request) { _txn_id = request->txn_id(); _schema = std::make_shared<OlapTableSchemaParam>(); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 317dc510861..fe7d90d502e 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -106,7 +106,7 @@ public: LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile); ~LoadStream(); - Status init(const POpenStreamSinkRequest* request); + Status init(const POpenLoadStreamRequest* request); void add_source(int64_t src_id) { std::lock_guard lock_guard(_lock); diff --git a/be/src/runtime/load_stream_mgr.cpp b/be/src/runtime/load_stream_mgr.cpp index d1af7b60d3f..b3553046aec 100644 --- a/be/src/runtime/load_stream_mgr.cpp +++ b/be/src/runtime/load_stream_mgr.cpp @@ -45,7 +45,7 @@ LoadStreamMgr::~LoadStreamMgr() { _file_writer_thread_pool->shutdown(); } -Status LoadStreamMgr::open_load_stream(const POpenStreamSinkRequest* request, +Status LoadStreamMgr::open_load_stream(const POpenLoadStreamRequest* request, LoadStreamSharedPtr& load_stream) { UniqueId load_id(request->load_id()); diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h index da3ae98a42e..30d4ed069c4 100644 --- a/be/src/runtime/load_stream_mgr.h +++ b/be/src/runtime/load_stream_mgr.h @@ -39,7 +39,7 @@ public: FifoThreadPool* light_work_pool); ~LoadStreamMgr(); - Status open_load_stream(const POpenStreamSinkRequest* request, + Status open_load_stream(const POpenLoadStreamRequest* request, LoadStreamSharedPtr& load_stream); void clear_load(UniqueId loadid); std::unique_ptr<ThreadPoolToken> new_token() { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index cfaef4895b3..dba01f1b668 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -355,8 +355,8 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl } void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* controller, - const POpenStreamSinkRequest* request, - POpenStreamSinkResponse* response, + const POpenLoadStreamRequest* request, + POpenLoadStreamResponse* response, google::protobuf::Closure* done) { bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { signal::set_signal_task_id(request->load_id()); diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 3ef14b6c265..ca28c8b8b06 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -94,7 +94,7 @@ public: google::protobuf::Closure* done) override; void open_load_stream(google::protobuf::RpcController* controller, - const POpenStreamSinkRequest* request, POpenStreamSinkResponse* response, + const POpenLoadStreamRequest* request, POpenLoadStreamResponse* response, google::protobuf::Closure* done) override; void tablet_writer_add_block(google::protobuf::RpcController* controller, diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 60ef352ddbb..a953747f27b 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -125,7 +125,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, return Status::Error<true>(ret, "Failed to create stream"); } cntl.set_timeout_ms(config::open_load_stream_timeout_ms); - POpenStreamSinkRequest request; + POpenLoadStreamRequest request; *request.mutable_load_id() = _load_id; request.set_src_id(_src_id); request.set_txn_id(txn_id); @@ -134,7 +134,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, for (auto& tablet : tablets_for_schema) { *request.add_tablets() = tablet; } - POpenStreamSinkResponse response; + POpenLoadStreamResponse response; // use "pooled" connection to avoid conflicts between streaming rpc and regular rpc, // see: https://github.com/apache/brpc/issues/392 const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "pooled"); diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index 17c8b3707ae..bdd0ace9a8b 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -348,8 +348,8 @@ public: : _sd(brpc::INVALID_STREAM_ID), _load_stream_mgr(load_stream_mgr) {} virtual ~StreamService() { brpc::StreamClose(_sd); }; virtual void open_load_stream(google::protobuf::RpcController* controller, - const POpenStreamSinkRequest* request, - POpenStreamSinkResponse* response, + const POpenLoadStreamRequest* request, + POpenLoadStreamResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); std::unique_ptr<PStatus> status = std::make_unique<PStatus>(); @@ -438,8 +438,8 @@ public: return Status::InternalError("Fail to create stream"); } - POpenStreamSinkRequest request; - POpenStreamSinkResponse response; + POpenLoadStreamRequest request; + POpenLoadStreamResponse response; PUniqueId id; id.set_hi(1); id.set_lo(1); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 99f4464fd59..9a36916317f 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -742,7 +742,7 @@ message PGroupCommitInsertResponse { optional int64 filtered_rows = 5; } -message POpenStreamSinkRequest { +message POpenLoadStreamRequest { optional PUniqueId load_id = 1; optional int64 txn_id = 2; optional int64 src_id = 3; @@ -757,7 +757,7 @@ message PTabletSchemaWithIndex { optional bool enable_unique_key_merge_on_write = 3; } -message POpenStreamSinkResponse { +message POpenLoadStreamResponse { optional PStatus status = 1; repeated PTabletSchemaWithIndex tablet_schemas = 2; } @@ -798,7 +798,7 @@ service PBackendService { rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns (PCancelPlanFragmentResult); rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult); rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult); - rpc open_load_stream(POpenStreamSinkRequest) returns (POpenStreamSinkResponse); + rpc open_load_stream(POpenLoadStreamRequest) returns (POpenLoadStreamResponse); rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult); rpc tablet_writer_add_block_by_http(PEmptyRequest) returns (PTabletWriterAddBlockResult); rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org