This is an automated email from the ASF dual-hosted git repository. doleyzi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9ac4e5af08 [INLONG-10838][SDK] Optimize the ability to send data for DataProxy C++ SDK (#10850) 9ac4e5af08 is described below commit 9ac4e5af08812b6b0364a5296f9afc63433cee82 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed Aug 21 18:46:39 2024 +0800 [INLONG-10838][SDK] Optimize the ability to send data for DataProxy C++ SDK (#10850) --- .../dataproxy-sdk-cpp/include/inlong_api.h | 54 +++++ .../dataproxy-sdk-cpp/src/client/tcp_client.cc | 10 +- .../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 6 + .../dataproxy-sdk-cpp/src/config/sdk_conf.h | 2 +- .../dataproxy-sdk-cpp/src/core/api_imp.cc | 34 +-- .../dataproxy-sdk-cpp/src/core/api_imp.h | 10 +- .../dataproxy-sdk-cpp/src/core/inlong_api.cc | 29 ++- .../dataproxy-sdk-cpp/src/core/inlong_api.h | 44 ++-- .../dataproxy-sdk-cpp/src/core/sdk_msg.h | 37 +++- .../dataproxy-sdk-cpp/src/group/recv_group.cc | 10 +- .../dataproxy-sdk-cpp/src/group/send_group.cc | 236 +++++++++------------ .../dataproxy-sdk-cpp/src/group/send_group.h | 57 +++-- .../dataproxy-sdk-cpp/src/protocol/msg_protocol.cc | 4 +- .../dataproxy-sdk-cpp/src/utils/capi_constant.h | 1 + .../dataproxy-sdk-cpp/src/utils/send_buffer.h | 153 +++++++------ 15 files changed, 362 insertions(+), 325 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h new file mode 100644 index 0000000000..8f79c3b802 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/include/inlong_api.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef INLONG_SDK_API_H +#define INLONG_SDK_API_H + +#include <clocale> +#include <cstdint> +#include <functional> +#include <memory> +#include <vector> + +namespace inlong { + +typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, + const int64_t, const char *); + +class ApiImp; + +class InLongApi { + public: + InLongApi(); + ~InLongApi(); + int32_t InitApi(const char *config_path); + + int32_t AddInLongGroupId(const std::vector<std::string> &group_ids); + + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + UserCallBack call_back = nullptr); + + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + int64_t data_time, UserCallBack call_back = nullptr); + + int32_t CloseApi(int32_t max_waitms); + + private: + std::shared_ptr<ApiImp> api_impl_; +}; +} // namespace inlong +#endif // INLONG_SDK_API_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc index eb3264ba6c..c3b7692809 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc @@ -164,7 +164,7 @@ void TcpClient::BeginWrite() { } last_update_time_ = Utils::getCurrentMsTime(); status_ = kWriting; - asio::async_write(*socket_, asio::buffer(sendBuffer_->content(), sendBuffer_->len()), + asio::async_write(*socket_, asio::buffer(sendBuffer_->GetData(), sendBuffer_->GetDataLen()), std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, std::placeholders::_2)); } void TcpClient::OnWroten(const asio::error_code error, std::size_t bytes_transferred) { @@ -390,8 +390,8 @@ void TcpClient::ParseHeartBeat(size_t total_length) { void TcpClient::ParseGenericResponse() { if (sendBuffer_ != nullptr) { - std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + sendBuffer_->getStreamId(); - stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->msgCnt()); + std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner + sendBuffer_->GetInlongStreamId(); + stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->GetMsgCnt()); stat_map_[stat_key].AddSendSuccessPackNum(1); stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); @@ -448,8 +448,8 @@ void TcpClient::ResetSendBuffer() { } retry_times_++; if (retry_times_ > SdkConfig::getInstance()->retry_times_) { - std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + sendBuffer_->getStreamId(); - stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->msgCnt()); + std::string stat_key = sendBuffer_->GetInlongGroupId() + kStatJoiner + sendBuffer_->GetInlongStreamId(); + stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->GetMsgCnt()); stat_map_[stat_key].AddSendFailPackNum(1); stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index 12a82b07e7..5f067528af 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -443,6 +443,12 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) { } else { retry_times_ = constants::kRetryTimes; } + if (doc.HasMember("proxy_repeat_times") && doc["proxy_repeat_times"].IsInt() && doc["proxy_repeat_times"].GetInt() >= 0) { + const rapidjson::Value &obj = doc["proxy_repeat_times"]; + proxy_repeat_times_ = obj.GetInt(); + } else { + proxy_repeat_times_ = constants::kProxyRepeatTimes; + } } void SdkConfig::InitAuthParm(const rapidjson::Value &doc) { // auth settings diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h index ae6cfde58b..f120343f07 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h @@ -96,7 +96,7 @@ private: bool enable_balance_; bool enable_local_cache_; uint32_t retry_times_; - + uint32_t proxy_repeat_times_; // auth settings bool need_auth_; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc index 7ae2c79ea2..2109af502e 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc @@ -46,31 +46,31 @@ int32_t ApiImp::InitApi(const char *config_file_path) { return DoInit(); } -int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len, +int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, UserCallBack call_back) { - int32_t code=ValidateParams(group_id, stream_id, msg, msg_len); + int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len); if(code !=SdkCode::kSuccess){ return code; } - return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back); + return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len}, call_back); } -int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len, +int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, int64_t data_time, UserCallBack call_back) { - int32_t code=ValidateParams(group_id, stream_id, msg, msg_len); + int32_t code=ValidateParams(inlong_group_id, inlong_stream_id, msg, msg_len); if(code !=SdkCode::kSuccess){ return code; } - return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back, data_time); + return this->SendBase(inlong_group_id, inlong_stream_id, {msg, msg_len}, call_back, data_time); } -int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len){ +int32_t ApiImp::ValidateParams(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len){ if (msg_len > max_msg_length_) { - MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1); + MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id, inlong_stream_id, 1); return SdkCode::kMsgTooLong; } - if (group_id == nullptr || stream_id == nullptr || msg == nullptr || msg_len <= 0) { + if (inlong_group_id == nullptr || inlong_stream_id == nullptr || msg == nullptr || msg_len <= 0) { return SdkCode::kInvalidInput; } @@ -80,9 +80,9 @@ int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id, cons return SdkCode::kSuccess; } -int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string& stream_id, const std::string& msg, +int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg, UserCallBack call_back, int64_t report_time) { - int32_t check_ret = CheckData(inlong_group_id, stream_id, msg); + int32_t check_ret = CheckData(inlong_group_id, inlong_stream_id, msg); if (check_ret != SdkCode::kSuccess) { return check_ret; } @@ -91,11 +91,11 @@ int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string& auto recv_group = recv_manager_->GetRecvGroup(inlong_group_id); if (recv_group == nullptr) { - LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << ",getStreamId:" << stream_id); + LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << ",getStreamId:" << inlong_stream_id); return SdkCode::kFailGetRevGroup; } - return recv_group->SendData(msg, inlong_group_id, stream_id, report_time, call_back); + return recv_group->SendData(msg, inlong_group_id, inlong_stream_id, report_time, call_back); } int32_t ApiImp::CloseApi(int32_t max_waitms) { @@ -125,19 +125,19 @@ int32_t ApiImp::DoInit() { return InitManager(); } -int32_t ApiImp::CheckData(const std::string& group_id, const std::string& stream_id, const std::string& msg) { +int32_t ApiImp::CheckData(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg) { if (init_succeed_ == 0 || user_exit_flag_.get() == 1) { LOG_ERROR("capi has been closed, Init first and then send"); return SdkCode::kSendAfterClose; } - if (msg.empty() || group_id.empty() || stream_id.empty()) { - LOG_ERROR("invalid input, group id:" << group_id << " stream id:" << stream_id << "msg" << msg); + if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty()) { + LOG_ERROR("invalid input, group id:" << inlong_group_id << " stream id:" << inlong_stream_id << "msg" << msg); return SdkCode::kInvalidInput; } if (msg.size() > SdkConfig::getInstance()->max_msg_size_) { - MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1); + MetricManager::GetInstance()->AddTooLongMsgCount(inlong_group_id, inlong_stream_id, 1); LOG_ERROR("msg DataLen is too long, cur msg_len" << msg.size() << " ext_pack_size" << SdkConfig::getInstance()->max_msg_size_); return SdkCode::kMsgTooLong; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h index 32ca093416..cd6a9757a4 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h @@ -34,10 +34,10 @@ class ApiImp { ~ApiImp(); int32_t InitApi(const char *config_file_path); - int32_t Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len, + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, UserCallBack call_back = nullptr); - int32_t Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len, + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, int64_t report_time, UserCallBack call_back = nullptr); int32_t CloseApi(int32_t max_waitms); @@ -46,12 +46,12 @@ class ApiImp { private: int32_t DoInit(); int32_t InitManager(); - int32_t SendBase(const std::string& inlong_group_id, const std::string& stream_id, const std::string& msg, UserCallBack call_back, + int32_t SendBase(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg, UserCallBack call_back, int64_t report_time = 0); - int32_t CheckData(const std::string& group_id, const std::string& stream_id, const std::string& msg); + int32_t CheckData(const std::string& inlong_group_id, const std::string& inlong_stream_id, const std::string& msg); - int32_t ValidateParams(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len); + int32_t ValidateParams(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len); AtomicInt user_exit_flag_{0}; volatile bool init_flag_ = false; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc index 14a5f91207..aa0674414d 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.cc @@ -15,28 +15,25 @@ * limitations under the License. */ -#include "inlong_api.h" +#include "../../include/inlong_api.h" #include "../core/api_imp.h" namespace inlong { -InLongApi::InLongApi() { api_impl_ = std::make_shared<ApiImp>(); }; +InLongApi::InLongApi() { api_impl_ = std::make_shared<ApiImp>(); } InLongApi::~InLongApi() { api_impl_->CloseApi(10); } -int32_t InLongApi::InitApi(const char *config_path) { - return api_impl_->InitApi(config_path); -} +int32_t InLongApi::InitApi(const char *config_path) { return api_impl_->InitApi(config_path); } -int32_t InLongApi::Send(const char *inlong_group_id, - const char *inlong_stream_id, const char *msg, - int32_t msg_len, UserCallBack call_back) { - return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, - call_back); +int32_t InLongApi::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + UserCallBack call_back) { + return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, call_back); } -int32_t InLongApi::CloseApi(int32_t max_waitms) { - return api_impl_->CloseApi(max_waitms); -} -int32_t InLongApi::AddBid(const std::vector<std::string> &groupids) { - return api_impl_->AddInLongGroupId(groupids); +int32_t InLongApi::Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + int64_t data_time, UserCallBack call_back) { + return api_impl_->Send(inlong_group_id, inlong_stream_id, msg, msg_len, data_time, call_back); } -} // namespace inlong \ No newline at end of file + +int32_t InLongApi::CloseApi(int32_t max_waitms) { return api_impl_->CloseApi(max_waitms); } +int32_t InLongApi::AddInLongGroupId(const std::vector<std::string> &bids) { return api_impl_->AddInLongGroupId(bids); } +} // namespace inlong \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h index 66a1a7ce39..88f0a528ff 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/inlong_api.h @@ -1,20 +1,18 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #ifndef INLONG_SDK_API_H @@ -34,21 +32,23 @@ typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, class ApiImp; class InLongApi { -public: + public: InLongApi(); ~InLongApi(); int32_t InitApi(const char *config_path); - int32_t AddBid(const std::vector<std::string> &groupids); + int32_t AddGroupId(const std::vector<std::string> &group_ids); - int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, - const char *msg, int32_t msg_len, + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, UserCallBack call_back = nullptr); + int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, const char *msg, int32_t msg_len, + int64_t data_time, UserCallBack call_back = nullptr); + int32_t CloseApi(int32_t max_waitms); -private: + private: std::shared_ptr<ApiImp> api_impl_; }; -} // namespace inlong -#endif // INLONG_SDK_API_H +} // namespace inlong +#endif // INLONG_SDK_API_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h index d9e8366486..69bcdddde8 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h @@ -15,8 +15,8 @@ * limitations under the License. */ -#ifndef SDK_USER_MSG_H_ -#define SDK_USER_MSG_H_ +#ifndef SDK_USER_MSG_H +#define SDK_USER_MSG_H #include <functional> #include <memory> @@ -40,27 +40,44 @@ struct SdkMsg { std::string inlong_group_id_; std::string inlong_stream_id_; - SdkMsg(const std::string &mmsg, const std::string &mclient_ip, - int64_t mreport_time, UserCallBack mcb, const std::string &attr, - const std::string &u_ip, int64_t u_time,const std::string& inlong_group_id,const std::string& inlong_stream_id) + SdkMsg(const std::string &mmsg, + const std::string &mclient_ip, + int64_t mreport_time, + UserCallBack mcb, + const std::string &attr, + const std::string &u_ip, + int64_t u_time, + const std::string &inlong_group_id, + const std::string &inlong_stream_id) : msg_(mmsg), client_ip_(mclient_ip), report_time_(mreport_time), cb_(mcb), user_report_time_(u_time), user_client_ip_(u_ip), data_pack_format_attr_(attr), inlong_group_id_(inlong_group_id), - inlong_stream_id_(inlong_stream_id){} - SdkMsg(){}; + inlong_stream_id_(inlong_stream_id) {} + SdkMsg() {}; void setMsg(const std::string &msg) { msg_ = msg; } void setClientIp(const std::string &clientIp) { client_ip_ = clientIp; } void setReportTime(uint64_t reportTime) { report_time_ = reportTime; } - void setCb(UserCallBack cb) { cb_ = cb;} + void setCb(UserCallBack cb) { cb_ = cb; } void setUserReportTime(uint64_t userReportTime) { user_report_time_ = userReportTime; } void setUserClientIp(const std::string &userClientIp) { user_client_ip_ = userClientIp; } void setDataPackFormatAttr(const std::string &dataPackFormatAttr) { data_pack_format_attr_ = dataPackFormatAttr; } void setGroupId(const std::string &inlong_group_id) { inlong_group_id_ = inlong_group_id; } void setStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ = inlong_stream_id; } + + void clear() { + msg_ = ""; + client_ip_ = ""; + report_time_ = 0; + cb_ = nullptr; + user_report_time_ = 0; + user_client_ip_ = ""; + data_pack_format_attr_ = ""; + inlong_group_id_ = ""; + inlong_stream_id_ = ""; + } }; using SdkMsgPtr = std::shared_ptr<SdkMsg>; } // namespace inlong - -#endif // SDK_USER_MSG_H_ \ No newline at end of file +#endif // SDK_USER_MSG_H \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc index 377432a2ef..1ffeeef8dc 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc @@ -352,15 +352,15 @@ std::shared_ptr<SendBuffer> RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs uniq_id_ = 0; } - if (!PackMsg(msgs, send_buffer->content(), len) || len == 0) { + if (!PackMsg(msgs, send_buffer->GetData(), len) || len == 0) { LOG_ERROR("failed to write data to send buf from pack queue, sendQueue"); return nullptr; } - send_buffer->setLen(len); - send_buffer->setMsgCnt(msg_cnt); - send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_); - send_buffer->setStreamId(msgs[0]->inlong_stream_id_); + send_buffer->SetDataLen(len); + send_buffer->SetMsgCnt(msg_cnt); + send_buffer->SetInlongGroupId(msgs[0]->inlong_group_id_); + send_buffer->SetInlongStreamId(msgs[0]->inlong_stream_id_); for (const auto &it : msgs) { if(it->cb_){ send_buffer->addUserMsg(it); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc index f317163799..0bbedcd0fb 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc @@ -1,45 +1,45 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include "send_group.h" -#include "api_code.h" -#include "proxy_manager.h" +#include <sys/prctl.h> #include <algorithm> #include <random> +#include "../core/api_code.h" +#include "../manager/proxy_manager.h" namespace inlong { -const int kDefaultQueueSize = 20; +const uint32_t kDefaultQueueSize = 200; SendGroup::SendGroup(std::string send_group_key) : work_(asio::make_work_guard(io_context_)), - send_group_key_(send_group_key), send_idx_(0), dispatch_stat_(0), - load_threshold_(0), max_proxy_num_(0) { - max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ / - SdkConfig::getInstance()->pack_size_; - if (max_send_queue_num_ <= 0) { + send_group_key_(send_group_key), + send_idx_(0), + dispatch_stat_(0), + load_threshold_(0), + max_proxy_num_(0) { + max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ / SdkConfig::getInstance()->pack_size_; + if (max_send_queue_num_ <= kDefaultQueueSize) { max_send_queue_num_ = kDefaultQueueSize; } - LOG_INFO("SendGroup:" << send_group_key_ - << ",max send queue num:" << max_send_queue_num_); + max_send_queue_num_ = std::max(max_send_queue_num_, kDefaultQueueSize); + LOG_INFO("SendGroup:" << send_group_key_ << ",max send queue num:" << max_send_queue_num_); dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_; load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_; - heart_beat_interval_ = - SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_; + heart_beat_interval_ = SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_; need_balance_ = SdkConfig::getInstance()->enable_balance_; work_clients_old_ = nullptr; @@ -48,23 +48,19 @@ SendGroup::SendGroup(std::string send_group_key) send_timer_ = std::make_shared<asio::steady_timer>(io_context_); send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_)); - send_timer_->async_wait( - std::bind(&SendGroup::PreDispatchData, this, std::placeholders::_1)); + send_timer_->async_wait(std::bind(&SendGroup::PreDispatchData, this, std::placeholders::_1)); update_conf_timer_ = std::make_shared<asio::steady_timer>(io_context_); update_conf_timer_->expires_after(std::chrono::milliseconds(1)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); if (SdkConfig::getInstance()->enable_balance_) { load_balance_timer_ = std::make_shared<asio::steady_timer>(io_context_); - load_balance_timer_->expires_after( - std::chrono::milliseconds(load_balance_interval_)); - load_balance_timer_->async_wait( - std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); + load_balance_timer_->expires_after(std::chrono::milliseconds(load_balance_interval_)); + load_balance_timer_->async_wait(std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); } - current_bus_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_); + current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_); thread_ = std::thread(&SendGroup::Run, this); } SendGroup::~SendGroup() { @@ -84,14 +80,16 @@ SendGroup::~SendGroup() { thread_.join(); } } -void SendGroup::Run() { io_context_.run(); } +void SendGroup::Run() { + prctl(PR_SET_NAME, "send-group"); + io_context_.run(); +} void SendGroup::PreDispatchData(std::error_code error) { if (error) { return; } send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_)); - send_timer_->async_wait( - std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); + send_timer_->async_wait(std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); } void SendGroup::DispatchData(std::error_code error) { @@ -125,8 +123,7 @@ void SendGroup::DispatchData(std::error_code error) { } send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_)); - send_timer_->async_wait( - std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); + send_timer_->async_wait(std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); } void SendGroup::HeartBeat() { unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); @@ -148,12 +145,12 @@ void SendGroup::HeartBeat() { bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; } -uint32_t SendGroup::PushData(SendBufferPtrT send_buffer_ptr) { +uint32_t SendGroup::PushData(const SendBufferPtrT &send_buffer_ptr) { if (IsFull()) { return SdkCode::kSendBufferFull; } std::lock_guard<std::mutex> lock(mutex_); - send_buf_list_.push(send_buffer_ptr); + send_proxy_list_.push(send_buffer_ptr); return SdkCode::kSuccess; } @@ -165,60 +162,53 @@ void SendGroup::UpdateConf(std::error_code error) { ClearOldTcpClients(); - ProxyInfoVec new_bus_info; - if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_bus_info) != - kSuccess || - new_bus_info.empty()) { + ProxyInfoVec new_proxy_info; + if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_proxy_info) != kSuccess || new_proxy_info.empty()) { update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); return; } - if (new_bus_info.empty()) { - LOG_INFO("UpdateConf new_bus_info is empty!"); + if (new_proxy_info.empty()) { + LOG_INFO("New proxy is empty when update config!"); return; } - load_threshold_ = new_bus_info[0].GetLoad() > constants::kDefaultLoadThreshold - ? constants::kDefaultLoadThreshold - : std::max((new_bus_info[0].GetLoad()), 0); + load_threshold_ = new_proxy_info[0].GetLoad() > constants::kDefaultLoadThreshold + ? constants::kDefaultLoadThreshold + : std::max((new_proxy_info[0].GetLoad()), 0); - if (!IsConfChanged(current_bus_vec_, new_bus_info)) { - LOG_INFO("Don`t need UpdateConf. current bus size(" - << current_bus_vec_.size() << ")=bus size(" << new_bus_info.size() - << ")"); + if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) { + LOG_INFO("Don`t need UpdateConf. current proxy size(" << current_proxy_vec_.size() << ")=proxy size(" + << new_proxy_info.size() << ")"); update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); return; } - max_proxy_num_ = - std::min(SdkConfig::getInstance()->max_proxy_num_, new_bus_info.size()); + max_proxy_num_ = std::min(SdkConfig::getInstance()->max_proxy_num_, new_proxy_info.size()); - std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp = - std::make_shared<std::vector<TcpClientTPtrT>>(); + std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp = std::make_shared<std::vector<TcpClientTPtrT>>(); if (tcp_clients_tmp == nullptr) { - LOG_INFO("tcp_clients_tmp is nullptr"); + LOG_INFO("Tcp clients tmp is nullptr"); update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); return; } - std::random_shuffle(new_bus_info.begin(), new_bus_info.end()); + std::random_shuffle(new_proxy_info.begin(), new_proxy_info.end()); tcp_clients_tmp->reserve(max_proxy_num_); for (int i = 0; i < max_proxy_num_; i++) { - ProxyInfo bus_tmp = new_bus_info[i]; - TcpClientTPtrT tcpClientTPtrT = - std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port()); - tcp_clients_tmp->push_back(tcpClientTPtrT); - LOG_INFO("new bus info.[" << bus_tmp.ip() << ":" << bus_tmp.port() << "]"); + ProxyInfo tmp_proxy = new_proxy_info[i]; + for (int repeat_time = 0; repeat_time < SdkConfig::getInstance()->proxy_repeat_times_; repeat_time++) { + TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(io_context_, tmp_proxy.ip(), tmp_proxy.port()); + tcp_clients_tmp->push_back(tcpClientTPtrT); + LOG_INFO("New proxy info.[" << tmp_proxy.ip() << ":" << tmp_proxy.port() << "]"); + } } { - LOG_INFO("do change tcp clients."); + LOG_INFO("Do change tcp clients."); unique_write_lock<read_write_mutex> wtlck(work_clients_mutex_); work_clients_old_ = work_clients_; work_clients_ = tcp_clients_tmp; @@ -230,51 +220,43 @@ void SendGroup::UpdateConf(std::error_code error) { } } - current_bus_vec_ = new_bus_info; + current_proxy_vec_ = new_proxy_info; InitReserveClient(); update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); - update_conf_timer_->async_wait( - std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); + update_conf_timer_->async_wait(std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); - LOG_INFO("Finished UpdateConf."); + LOG_INFO("Finished update send group config."); } SendBufferPtrT SendGroup::PopData() { std::lock_guard<std::mutex> lock(mutex_); - if (send_buf_list_.empty()) { + if (send_proxy_list_.empty()) { return nullptr; } - SendBufferPtrT send_buf = send_buf_list_.front(); - send_buf_list_.pop(); + SendBufferPtrT send_buf = send_proxy_list_.front(); + send_proxy_list_.pop(); return send_buf; } uint32_t SendGroup::GetQueueSize() { std::lock_guard<std::mutex> lock(mutex_); - return send_buf_list_.size(); + return send_proxy_list_.size(); } -bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_bus_vec, - ProxyInfoVec &new_bus_vec) { - if (new_bus_vec.empty()) - return false; - if (current_bus_vec.size() != new_bus_vec.size()) { +bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_proxy_vec, ProxyInfoVec &new_proxy_vec) { + if (new_proxy_vec.empty()) return false; + if (current_proxy_vec.size() != new_proxy_vec.size()) { return true; } - for (auto ¤t_bu : current_bus_vec) { - for (int i = 0; i < new_bus_vec.size(); i++) { - if ((current_bu.ip() == new_bus_vec[i].ip()) && - (current_bu.port() == new_bus_vec[i].port())) - break; - if (i == (new_bus_vec.size() - 1)) { - if ((current_bu.ip() != new_bus_vec[i].ip() || - current_bu.port() == new_bus_vec[i].port())) { - LOG_INFO("current bus ip." << current_bu.ip() << ":" - << current_bu.port() - << " can`t find in bus."); + for (auto ¤t_bu : current_proxy_vec) { + for (int i = 0; i < new_proxy_vec.size(); i++) { + if ((current_bu.ip() == new_proxy_vec[i].ip()) && (current_bu.port() == new_proxy_vec[i].port())) break; + if (i == (new_proxy_vec.size() - 1)) { + if ((current_bu.ip() != new_proxy_vec[i].ip() || current_bu.port() == new_proxy_vec[i].port())) { + LOG_INFO("current proxy ip." << current_bu.ip() << ":" << current_bu.port() << " can`t find in proxy."); return true; } } @@ -312,8 +294,7 @@ void SendGroup::LoadBalance(std::error_code error) { uint64_t interval = load_balance_interval_ + rand() % 120 * 1000; LOG_INFO("LoadBalance interval:" << interval); load_balance_timer_->expires_after(std::chrono::milliseconds(interval)); - load_balance_timer_->async_wait( - std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); + load_balance_timer_->async_wait(std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); } void SendGroup::DoLoadBalance() { @@ -328,17 +309,13 @@ void SendGroup::DoLoadBalance() { return; } - if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) > - load_threshold_) { - LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace" - << work_client->getClientInfo() << ",load[work " - << work_client->GetAvgLoad() << "][reserve " - << reserve_client->GetAvgLoad() << "][threshold " - << load_threshold_ << "]"); + if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) > load_threshold_) { + LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace" << work_client->getClientInfo() + << ",load[work " << work_client->GetAvgLoad() << "][reserve " + << reserve_client->GetAvgLoad() << "][threshold " << load_threshold_ << "]"); std::string ip = work_client->getIp(); uint32_t port = work_client->getPort(); - work_client->UpdateClient(reserve_client->getIp(), - reserve_client->getPort()); + work_client->UpdateClient(reserve_client->getIp(), reserve_client->getPort()); ProxyInfo proxy = GetRandomProxy(ip, port); if (!proxy.ip().empty()) { @@ -350,12 +327,10 @@ void SendGroup::DoLoadBalance() { } bool SendGroup::NeedDoLoadBalance() { unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); - if (load_threshold_ <= 0 || - work_clients_->size() == current_bus_vec_.size()) { - LOG_INFO("Don`t need DoLoadBalance [load_threshold]:" - << load_threshold_ - << ",[tcp_client size]:" << work_clients_->size() - << ",[current_bus_vec size]:" << current_bus_vec_.size()); + if (load_threshold_ <= 0 || work_clients_->size() == current_proxy_vec_.size()) { + LOG_INFO("Don`t need DoLoadBalance [load_threshold]:" << load_threshold_ + << ",[tcp_client size]:" << work_clients_->size() + << ",[current_proxy_vec size]:" << current_proxy_vec_.size()); need_balance_ = false; return false; } @@ -363,12 +338,11 @@ bool SendGroup::NeedDoLoadBalance() { return true; } void SendGroup::InitReserveClient() { - if (max_proxy_num_ >= current_bus_vec_.size()) { + if (max_proxy_num_ >= current_proxy_vec_.size()) { return; } - uint64_t max_reserve_num = current_bus_vec_.size() - max_proxy_num_; - uint64_t reserve_num = - std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num); + uint64_t max_reserve_num = current_proxy_vec_.size() - max_proxy_num_; + uint64_t reserve_num = std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num); if (reserve_num <= 0) { return; } @@ -376,15 +350,12 @@ void SendGroup::InitReserveClient() { unique_write_lock<read_write_mutex> wtlck(reserve_clients_mutex_); reserve_clients_.clear(); - for (uint64_t i = current_bus_vec_.size() - reserve_num; - i < current_bus_vec_.size(); i++) { - ProxyInfo bus_tmp = current_bus_vec_[i]; - TcpClientTPtrT tcpClientTPtrT = - std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port()); + for (uint64_t i = current_proxy_vec_.size() - reserve_num; i < current_proxy_vec_.size(); i++) { + ProxyInfo tmp_proxy = current_proxy_vec_[i]; + TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>(io_context_, tmp_proxy.ip(), tmp_proxy.port()); reserve_clients_.push_back(tcpClientTPtrT); } - LOG_INFO( - "InitReserveClient reserve_clients size:" << reserve_clients_.size()); + LOG_INFO("InitReserveClient reserve_clients size:" << reserve_clients_.size()); } bool SendGroup::UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end) { if (begin && end) { @@ -409,14 +380,13 @@ TcpClientTPtrT SendGroup::GetMaxLoadClient() { ProxyInfo SendGroup::GetRandomProxy(const std::string &ip, uint32_t port) { ProxyInfo proxy_info; - for (auto &it : current_bus_vec_) { + for (auto &it : current_proxy_vec_) { if (it.ip() == ip && it.port() == port) { continue; } bool exist = false; for (int index = 0; index < reserve_clients_.size(); index++) { - if (it.ip() == reserve_clients_[index]->getIp() && - it.port() == reserve_clients_[index]->getPort()) { + if (it.ip() == reserve_clients_[index]->getIp() && it.port() == reserve_clients_[index]->getPort()) { exist = true; break; } @@ -444,8 +414,7 @@ TcpClientTPtrT SendGroup::GetReserveClient() { } bool exist = false; for (int index = 0; index < work_clients_->size(); index++) { - if (it->getIp() == (*work_clients_)[index]->getIp() && - it->getPort() == (*work_clients_)[index]->getPort()) { + if (it->getIp() == (*work_clients_)[index]->getIp() && it->getPort() == (*work_clients_)[index]->getPort()) { exist = true; break; } @@ -461,11 +430,10 @@ TcpClientTPtrT SendGroup::GetReserveClient() { bool SendGroup::ExistInWorkClient(const std::string &ip, uint32_t port) { unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); for (int index = 0; index < work_clients_->size(); index++) { - if (ip == (*work_clients_)[index]->getIp() && - port == (*work_clients_)[index]->getPort()) { + if (ip == (*work_clients_)[index]->getIp() && port == (*work_clients_)[index]->getPort()) { return true; } } return false; } -} // namespace inlong +} // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h index a52d63dcd3..c54f858b90 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h @@ -1,53 +1,53 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #ifndef INLONG_SDK_SEND_GROUP_H #define INLONG_SDK_SEND_GROUP_H -#include "../client/tcp_client.h" +#include <queue> +#include <unordered_map> + #include "../config/proxy_info.h" #include "../utils/send_buffer.h" -#include <queue> +#include "../client/tcp_client.h" + namespace inlong { const int kTimerMiSeconds = 10; const int kTimerMinute = 60000; using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>; using IOContext = asio::io_context; -using io_context_work = - asio::executor_work_guard<asio::io_context::executor_type>; +using io_context_work = asio::executor_work_guard<asio::io_context::executor_type>; class SendGroup : noncopyable { -private: + private: IOContext io_context_; - io_context_work work_; + io_context_work work_; // 保持io_context.run()在无任何任务时不退出 std::thread thread_; void Run(); uint64_t max_proxy_num_; -public: + public: std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_; std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_old_; std::vector<TcpClientTPtrT> reserve_clients_; - ProxyInfoVec current_bus_vec_; - std::queue<SendBufferPtrT> send_buf_list_; + ProxyInfoVec current_proxy_vec_; + std::queue<SendBufferPtrT> send_proxy_list_; SteadyTimerPtr send_timer_; SteadyTimerPtr update_conf_timer_; @@ -75,11 +75,11 @@ public: void PreDispatchData(std::error_code error); void DispatchData(std::error_code error); bool IsFull(); - uint32_t PushData(SendBufferPtrT send_buffer_ptr); + uint32_t PushData(const SendBufferPtrT &send_buffer_ptr); SendBufferPtrT PopData(); uint32_t GetQueueSize(); void UpdateConf(std::error_code error); - bool IsConfChanged(ProxyInfoVec ¤t_bus_vec, ProxyInfoVec &new_bus_vec); + bool IsConfChanged(ProxyInfoVec ¤t_proxy_vec, ProxyInfoVec &new_proxy_vec); bool IsAvailable(); void ClearOldTcpClients(); @@ -96,7 +96,6 @@ public: bool ExistInWorkClient(const std::string &ip, uint32_t port); }; using SendGroupPtr = std::shared_ptr<SendGroup>; +} // namespace inlong -} // namespace inlong - -#endif // INLONG_SDK_SEND_GROUP_H +#endif // INLONG_SDK_SEND_GROUP_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc index 28bfed473c..8754d60ab2 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/protocol/msg_protocol.cc @@ -29,9 +29,7 @@ char APIEncode::recvBuf[APIEncode::kRecvLen] = {0}; void APIEncode::decodeProtocoMsg(SendBuffer *buf) { memset(recvBuf, 0x0, kRecvLen); - // //LOG_DEBUG("print buf content, %s", buf->content()); - memcpy(recvBuf, buf->content(), buf->len()); - // memcpy(recvBuf, buf->content(), buf->len()); + memcpy(recvBuf, buf->GetData(), buf->GetDataLen()); char *p = recvBuf; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index 47282e0e32..a1753b8255 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -83,6 +83,7 @@ static const uint32_t kTcpDetectionInterval = 60000; static const uint32_t kMaxRetryIntervalMs= 3000; static const uint32_t kRetryIntervalMs= 200; static const int32_t kRetryTimes = 1; +static const uint32_t kProxyRepeatTimes = 1; static const char kSerIP[] = "127.0.0.1"; static const uint32_t kSerPort = 46801; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h index 5b7e5a596a..4b2ca778ba 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h @@ -1,123 +1,120 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -#ifndef INLONG_SDK_SEND_BUFFER_H -#define INLONG_SDK_SEND_BUFFER_H +#ifndef INLONG_SEND_BUFFER_H +#define INLONG_SEND_BUFFER_H #include <mutex> #include <string> +#include <deque> +#include <queue> +#include "asio.hpp" #include "atomic.h" #include "logger.h" #include "noncopyable.h" -#include "sdk_msg.h" -#include <asio.hpp> -#include <deque> -#include <queue> + +#include "../core/sdk_msg.h" +#include "../manager/msg_manager.h" namespace inlong { -class Connection; -using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>; -using ConnectionPtr = std::shared_ptr<Connection>; class SendBuffer : noncopyable { -private: - uint32_t uniq_id_; - std::atomic<bool> is_used_; - std::atomic<bool> is_packed_; - char *content_; - uint32_t size_; - int32_t msg_cnt_; - uint32_t len_; + private: + char *data_; + uint32_t data_len_; + uint32_t msg_cnt_; std::string inlong_group_id_; std::string inlong_stream_id_; - AtomicInt already_send_; - uint64_t first_send_time_; - uint64_t latest_send_time_; std::vector<SdkMsgPtr> user_msg_vector_; -public: - SendBuffer(uint32_t size) - : uniq_id_(0), is_used_(false), is_packed_(false), size_(size), - msg_cnt_(0), len_(0), inlong_group_id_(), inlong_stream_id_(), - first_send_time_(0), latest_send_time_(0) { - content_ = new char[size]; - if (content_) { - memset(content_, 0x0, size); + public: + SendBuffer(uint32_t size) : msg_cnt_(0), data_len_(0), inlong_group_id_(), inlong_stream_id_() { + data_ = new char[size]; + if (data_) { + memset(data_, 0x0, size); } } ~SendBuffer() { - if (content_) { - delete[] content_; + if (data_) { + delete[] data_; } } - - char *content() { return content_; } - int32_t msgCnt() const { return msg_cnt_; } - void setMsgCnt(const int32_t &msg_cnt) { msg_cnt_ = msg_cnt; } - uint32_t len() { return len_; } - void setLen(const uint32_t len) { len_ = len; } - std::string getGroupId() { return inlong_group_id_; } - std::string getStreamId() { return inlong_stream_id_; } - void setInlongGroupId(const std::string &inlong_group_id) { + char *GetData() const { + return data_; + } + void SetData(char *data) { + data_ = data; + } + uint32_t GetDataLen() const { + return data_len_; + } + void SetDataLen(uint32_t data_len) { + data_len_ = data_len; + } + uint32_t GetMsgCnt() const { + return msg_cnt_; + } + void SetMsgCnt(uint32_t msg_cnt) { + msg_cnt_ = msg_cnt; + } + const std::string &GetInlongGroupId() const { + return inlong_group_id_; + } + void SetInlongGroupId(const std::string &inlong_group_id) { inlong_group_id_ = inlong_group_id; } - void setStreamId(const std::string &inlong_stream_id) { + const std::string &GetInlongStreamId() const { + return inlong_stream_id_; + } + void SetInlongStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ = inlong_stream_id; } - void setUniqId(const uint32_t &uniq_id) { uniq_id_ = uniq_id; } + void addUserMsg(const SdkMsgPtr &msg) { user_msg_vector_.push_back(msg); } - void addUserMsg(SdkMsgPtr msg) { user_msg_vector_.push_back(msg); } void doUserCallBack() { for (auto it : user_msg_vector_) { if (it->cb_) { - it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(), - it->msg_.data(), it->msg_.size(), it->user_report_time_, - it->user_client_ip_.data()); + it->cb_(it->inlong_group_id_.data(), + it->inlong_stream_id_.data(), + it->msg_.data(), + it->msg_.size(), + it->report_time_, + it->client_ip_.data()); } } } void releaseBuf() { - if (!is_used_) { - return; - } - uniq_id_ = 0; - is_used_ = false; - is_packed_ = false; - memset(content_, 0x0, size_); msg_cnt_ = 0; - len_ = 0; + data_len_ = 0; inlong_group_id_ = ""; inlong_stream_id_ = ""; - already_send_.getAndSet(0); - first_send_time_ = 0; - latest_send_time_ = 0; + for (const auto &it : user_msg_vector_) { + if (it->cb_) { + it->clear(); + } + } + MsgManager::GetInstance()->AddMsg(user_msg_vector_); user_msg_vector_.clear(); - AtomicInt fail_create_conn_; - fail_create_conn_.getAndSet(0); + user_msg_vector_.shrink_to_fit(); } - - void setIsPacked(bool is_packed) { is_packed_ = is_packed; } }; typedef std::shared_ptr<SendBuffer> SendBufferPtrT; -typedef std::queue<SendBufferPtrT> SendBufferPtrDeque; -} // namespace inlong +} // namespace inlong -#endif // INLONG_SDK_SEND_BUFFER_H \ No newline at end of file +#endif // INLONG_SEND_BUFFER_H \ No newline at end of file