This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b5d92d1b77 [INLONG-9228][SDK] CPP SDK supports dynamic load balancing (#9235) b5d92d1b77 is described below commit b5d92d1b77c25dd11a0f2f257289ea584cc3bb4f Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed Nov 8 11:28:59 2023 +0800 [INLONG-9228][SDK] CPP SDK supports dynamic load balancing (#9235) --- .../dataproxy-sdk-cpp/release/inc/sdk_conf.h | 6 +- .../dataproxy-sdk-cpp/src/client/stat.h | 11 +- .../dataproxy-sdk-cpp/src/client/tcp_client.cc | 194 ++++++++++-- .../dataproxy-sdk-cpp/src/client/tcp_client.h | 34 +- .../dataproxy-sdk-cpp/src/config/proxy_info.h | 8 +- .../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 30 ++ .../dataproxy-sdk-cpp/src/group/send_group.cc | 348 +++++++++++++++++---- .../dataproxy-sdk-cpp/src/group/send_group.h | 42 ++- .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 13 +- .../dataproxy-sdk-cpp/src/utils/capi_constant.h | 10 + 10 files changed, 591 insertions(+), 105 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h index 4407869ba7..609d6c35b8 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/release/inc/sdk_conf.h @@ -54,6 +54,8 @@ public: uint32_t per_groupid_thread_nums_; // Sending thread per groupid uint32_t dispatch_interval_zip_; // Compression thread distribution interval uint32_t dispatch_interval_send_; // sending thread sending interval + uint32_t load_balance_interval_; + uint32_t heart_beat_interval_; // Packaging parameters bool enable_pack_; @@ -77,7 +79,8 @@ public: std::string manager_cluster_url_; uint32_t manager_update_interval_; // Automatic update interval, minutes uint32_t manager_url_timeout_; // URL parsing timeout, seconds - uint32_t max_proxy_num_; + uint64_t max_proxy_num_; + uint64_t reserve_proxy_num_; uint32_t msg_type_; bool enable_isolation_; @@ -85,6 +88,7 @@ public: bool enable_tcp_nagle_; uint64_t tcp_idle_time_; // The time when tcpclient did not send data uint32_t tcp_detection_interval_; // tcp-client detection interval + bool enable_balance_; // auth settings bool need_auth_; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h index 695a66e31f..f57a26565f 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/stat.h @@ -28,29 +28,34 @@ private: uint64_t send_failed_pack_num_; uint64_t send_failed_msg_num_; + uint64_t time_cost_; public: Stat() : send_success_pack_num_(0), send_success_msg_num_(0), - send_failed_pack_num_(0), send_failed_msg_num_(0) {} + send_failed_pack_num_(0), send_failed_msg_num_(0) ,time_cost_(0) {} void AddSendSuccessPackNum(uint64_t num) { send_success_pack_num_ += num; } void AddSendSuccessMsgNum(uint64_t num) { send_success_msg_num_ += num; } void AddSendFailPackNum(uint64_t num) { send_failed_pack_num_ += num; } void AddSendFailMsgNum(uint64_t num) { send_failed_msg_num_ += num; } + void AddTimeCost(uint64_t time_cost) { time_cost_ += time_cost; } void ResetStat() { send_success_pack_num_ = 0; send_success_msg_num_ = 0; send_failed_pack_num_ = 0; send_failed_msg_num_ = 0; + time_cost_ = 0; } std::string ToString() { std::stringstream stat; stat << "success-pack[" << send_success_pack_num_ << "]"; - stat << " success-msg[" << send_success_msg_num_ << "]"; + stat << "msg[" << send_success_msg_num_ << "]"; stat << " failed-pack[" << send_failed_pack_num_ << "]"; - stat << " failed-msg[" << send_failed_msg_num_ << "]"; + stat << "msg[" << send_failed_msg_num_ << "]"; + uint64_t pack_num = send_success_pack_num_ + send_failed_msg_num_ + 1; + stat << " trans[" << time_cost_ / pack_num << "]"; return stat.str(); } }; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc index 8a2a583b3b..e27910c4e9 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc @@ -29,8 +29,9 @@ TcpClient::TcpClient(IOContext &io_context, std::string ip, uint32_t port) wait_timer_(std::make_shared<asio::steady_timer>(io_context)), keep_alive_timer_(std::make_shared<asio::steady_timer>(io_context)), ip_(ip), port_(port), endpoint_(asio::ip::address::from_string(ip), port), - status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false) { - ; + status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false), + proxy_loads_(30), wait_heart_beat_(false), reset_client_(false), + heart_beat_index_(0), only_heart_heat_(false) { client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]"; tcp_detection_interval_ = SdkConfig::getInstance()->tcp_detection_interval_; @@ -82,7 +83,6 @@ void TcpClient::AsyncConnect() { asio::error_code error; socket_->close(error); if (asio::error::operation_aborted == error) { - // operation aborted return; } } @@ -102,7 +102,6 @@ void TcpClient::DoAsyncConnect(asio::error_code error) { } if (error) { if (asio::error::operation_aborted == error) { - // operation aborted必 return; } } @@ -122,7 +121,6 @@ void TcpClient::OnConnected(asio::error_code error) { return; } if (asio::error::operation_aborted == error) { - // operation aborted return; } status_ = kConnectFailed; @@ -164,23 +162,22 @@ void TcpClient::OnWroten(const asio::error_code error, } if (error) { if (asio::error::operation_aborted == error) { - // operation aborted return; } - status_ = kWriting; LOG_ERROR("write error:" << error.message() << CLIENT_INFO); + status_ = kWriting; HandleFail(); return; } if (0 == bytes_transferred) { - status_ = kWaiting; LOG_ERROR("transferred 0 bytes." << CLIENT_INFO); + status_ = kWaiting; HandleFail(); return; } - status_ = CLIENT_RESPONSE; + status_ = kClientResponse; asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, sizeof(uint32_t)), std::bind(&TcpClient::OnReturn, this, std::placeholders::_1, std::placeholders::_2)); @@ -191,17 +188,16 @@ void TcpClient::OnReturn(asio::error_code error, std::size_t len) { } if (error) { if (asio::error::operation_aborted == error) { - // operation aborted return; } LOG_ERROR("OnReturn error:" << error.message() << CLIENT_INFO); status_ = kWaiting; + std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0); HandleFail(); return; } if (len != sizeof(uint32_t)) { status_ = kWaiting; - LOG_ERROR("len :" << len <<" != sizeof(uint32_t):" <<sizeof(uint32_t)<< CLIENT_INFO); HandleFail(); return; } @@ -210,11 +206,11 @@ void TcpClient::OnReturn(asio::error_code error, std::size_t len) { if (resp_len > recv_buf_->m_max_size) { status_ = kWaiting; - LOG_ERROR("invalid resp_len :" << resp_len << CLIENT_INFO); HandleFail(); return; } - asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, resp_len), + asio::async_read(*socket_, + asio::buffer(recv_buf_->m_data + sizeof(uint32_t), resp_len), std::bind(&TcpClient::OnBody, this, std::placeholders::_1, std::placeholders::_2)); } @@ -226,7 +222,6 @@ void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) { if (error) { if (asio::error::operation_aborted == error) { - // operation aborted return; } LOG_ERROR("OnBody error:" << error.message() << CLIENT_INFO); @@ -234,12 +229,27 @@ void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) { HandleFail(); return; } + uint32_t parse_index = sizeof(uint32_t); + uint8_t msg_type = + *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index); - if (sendBuffer_ != nullptr) { - stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt()); - stat_.AddSendSuccessPackNum(1); + switch (msg_type) { + case 8: + ParseHeartBeat(bytesTransferred); + break; + default: + ParseGenericResponse(); + break; + } + if (wait_heart_beat_) { + HeartBeat(); + wait_heart_beat_ = false; + return; + } - sendBuffer_->releaseBuf(); + if (reset_client_) { + RestClient(); + reset_client_ = false; } status_ = kFree; @@ -255,6 +265,8 @@ void TcpClient::HandleFail() { stat_.AddSendFailMsgNum(sendBuffer_->msgCnt()); stat_.AddSendFailPackNum(1); + stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); + sendBuffer_->doUserCallBack(); sendBuffer_->releaseBuf(); } @@ -269,12 +281,14 @@ void TcpClient::DetectStatus(const asio::error_code error) { if (error) { return; } - - LOG_INFO(stat_.ToString() << CLIENT_INFO); - stat_.ResetStat(); + if (!only_heart_heat_) { + LOG_INFO(stat_.ToString() << CLIENT_INFO); + stat_.ResetStat(); + } if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ && status_ != kConnecting) { + std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0); LOG_INFO("reconnect because it has idle " << tcp_idle_time_ << " ms." << "last send time:" << last_update_time_ << CLIENT_INFO); @@ -287,4 +301,142 @@ void TcpClient::DetectStatus(const asio::error_code error) { std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); } +void TcpClient::HeartBeat(bool only_heart_heat) { + if (kStopped == status_ || exit_) { + return; + } + only_heart_heat_ = only_heart_heat; + status_ = kHeartBeat; + last_update_time_ = Utils::getCurrentMsTime(); + // status_ = kWriting; + + bin_hb_.total_len = htonl(sizeof(BinaryHB) - 4); + bin_hb_.msg_type = 8; + bin_hb_.data_time = + htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000)); + bin_hb_.body_ver = 1; + bin_hb_.body_len = 0; + bin_hb_.attr_len = 0; + bin_hb_.magic = htons(constants::kBinaryMagic); + char *hb = (char *)&bin_hb_; + uint32_t hb_len = sizeof(bin_hb_); + + asio::async_write(*socket_, asio::buffer(hb, hb_len), + std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, + std::placeholders::_2)); +} + +void TcpClient::ParseHeartBeat(size_t total_length) { + // | total length(4) | msg type(1) | data time(4) | body version(1) | body + // length (4) | body | attr length(2) | attr | magic (2) | skip total length + uint32_t parse_index = sizeof(uint32_t); + // skip msg type + parse_index += sizeof(uint8_t); + // skip data time + // uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t + // *>(recv_buf_->m_data + parse_index)); + parse_index += sizeof(uint32_t); + + // 3、parse body version + uint32_t body_version = + *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index); + parse_index += sizeof(uint8_t); + + // 4、parse body length + uint32_t body_length = ntohl( + *reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index)); + parse_index += sizeof(uint32_t); + + // 5 parse load + int16_t load = ntohs( + *reinterpret_cast<const int16_t *>(recv_buf_->m_data + parse_index)); + parse_index += sizeof(int16_t); + + // 7 parse attr length + uint16_t attr_length = ntohs( + *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index)); + parse_index += sizeof(uint16_t); + + // 8 skip attr + parse_index += attr_length; + + // 9 parse magic + uint16_t magic = ntohs( + *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index)); + parse_index += sizeof(uint16_t); + + if (magic != constants::kBinaryMagic) { + LOG_ERROR("failed to parse heartbeat ack! error magic " + << magic << " !=" << constants::kBinaryMagic << CLIENT_INFO); + return; + } + + if (total_length + 4 != parse_index) { + LOG_ERROR("failed to parse heartbeat ack! total_length " + << total_length << " +4 !=" << parse_index << CLIENT_INFO); + return; + } + if (heart_beat_index_ > constants::MAX_STAT) { + heart_beat_index_ = 0; + } + if (body_version == 1 && body_length == 2) { + proxy_loads_[heart_beat_index_++ % 30] = load; + } else { + proxy_loads_[heart_beat_index_++ % 30] = 0; + } + LOG_INFO("current load is " << load << CLIENT_INFO); +} + +void TcpClient::ParseGenericResponse() { + if (sendBuffer_ != nullptr) { + stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt()); + stat_.AddSendSuccessPackNum(1); + + stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); + + sendBuffer_->releaseBuf(); + } +} + +int32_t TcpClient::GetAvgLoad() { + int32_t numerator = 0; + int32_t denominator = 0; + for (int i = 0; i < proxy_loads_.size(); i++) { + if (proxy_loads_[i] > 0) { + numerator += proxy_loads_[i] * constants::kWeight[i]; + denominator += constants::kWeight[i]; + } + } + int32_t avg_load = 0; + if (0 == denominator) { + return avg_load; + } + avg_load = numerator / denominator; + LOG_INFO("average load is " << avg_load << CLIENT_INFO); + return avg_load; +} + +void TcpClient::SetHeartBeatStatus() { wait_heart_beat_ = true; } + +void TcpClient::UpdateClient(const std::string ip, const uint32_t port) { + LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port + << "] replace" << CLIENT_INFO); + ip_ = ip; + port_ = port; + reset_client_ = true; +} +void TcpClient::RestClient() { + std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0); + asio::ip::tcp::endpoint endpoint(asio::ip::address::from_string(ip_), port_); + endpoint_ = endpoint; + client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]"; + + LOG_INFO("RestClient[" << only_heart_heat_ << "]" << CLIENT_INFO); + + AsyncConnect(); +} +const std::string &TcpClient::getIp() const { return ip_; } +const std::string &TcpClient::getClientInfo() const { return client_info_; } +uint32_t TcpClient::getPort() const { return port_; } + } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h index be0b6d915a..6835a7fd26 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h @@ -24,6 +24,7 @@ #include "../utils/capi_constant.h" #include "../utils/read_write_mutex.h" #include "../utils/send_buffer.h" +#include "msg_protocol.h" #include "stat.h" #include <queue> @@ -36,8 +37,10 @@ enum ClientStatus { kConnectFailed = 4, kWaiting = 5, kStopped = 6, - CLIENT_RESPONSE = 7 + kClientResponse = 7, + kHeartBeat = 8 }; + enum { kConnectTimeout = 1000 * 20, }; @@ -50,9 +53,23 @@ private: SteadyTimerPtr keep_alive_timer_; ClientStatus status_; std::string ip_; + +public: + const std::string &getIp() const; + +private: uint32_t port_; + +public: + uint32_t getPort() const; + +private: std::string client_info_; +public: + const std::string &getClientInfo() const; + +private: std::shared_ptr<SendBuffer> sendBuffer_; asio::ip::tcp::endpoint endpoint_; BlockMemoryPtrT recv_buf_; @@ -61,6 +78,12 @@ private: uint64_t last_update_time_; Stat stat_; bool exit_; + BinaryHB bin_hb_ = {0}; + std::vector<int32_t> proxy_loads_; + uint32_t heart_beat_index_; + bool wait_heart_beat_; + bool reset_client_; + volatile bool only_heart_heat_; public: TcpClient(IOContext &io_context, std::string ip, uint32_t port); @@ -74,9 +97,16 @@ public: void OnBody(asio::error_code error, size_t bytesTransferred); void DoClose(); void HandleFail(); - bool isFree() { return (status_ == kFree); }; + bool isFree() { return (status_ == kFree); } void write(SendBufferPtrT sendBuffer); void DetectStatus(const asio::error_code error); + void HeartBeat(bool only_heart_heat = false); + void SetHeartBeatStatus(); + void ParseHeartBeat(size_t total_length); + void ParseGenericResponse(); + void UpdateClient(const std::string ip, const uint32_t port); + void RestClient(); + int32_t GetAvgLoad(); }; typedef std::shared_ptr<TcpClient> TcpClientTPtrT; typedef std::vector<TcpClientTPtrT> TcpClientTPtrVecT; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h index d433e3a295..59700a473f 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/proxy_info.h @@ -29,13 +29,17 @@ private: std::string proxy_str_id_; std::string ip_; int32_t port_; + int32_t load_; public: - ProxyInfo(std::string proxy_str_id, std::string ip, int32_t port) - : proxy_str_id_(proxy_str_id), ip_(ip), port_(port) {} + ProxyInfo(std::string proxy_str_id, std::string ip, int32_t port,int32_t load) + : proxy_str_id_(proxy_str_id), ip_(ip), port_(port), load_(load) {} ProxyInfo(){}; + void setIp(const std::string& ip) { ip_ = ip; } + void setPort(int32_t port) { port_ = port; } std::string ip() const { return ip_; } int32_t port() const { return port_; } + int32_t GetLoad() const { return load_; } }; using ProxyInfoVec = std::vector<ProxyInfo>; } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index 3620d8985e..91e2fa8ad4 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -78,6 +78,9 @@ void SdkConfig::defaultInit() { dispatch_interval_zip_ = constants::kDispatchIntervalZip; tcp_detection_interval_ = constants::kTcpDetectionInterval; tcp_idle_time_ = constants::kTcpIdleTime; + load_balance_interval_ = constants::kLoadBalanceInterval; + heart_beat_interval_ = constants::kHeartBeatInterval; + enable_balance_ = constants::kEnableBalance; // cache parameter send_buf_size_ = constants::kSendBufSize; @@ -107,6 +110,7 @@ void SdkConfig::defaultInit() { manager_url_timeout_ = constants::kManagerTimeout; max_proxy_num_ = constants::kMaxProxyNum; enable_isolation_ = constants::kEnableIsolation; + reserve_proxy_num_ = constants::kReserveProxyNum; local_ip_ = constants::kSerIP; local_port_ = constants::kSerPort; @@ -144,6 +148,24 @@ void SdkConfig::InitThreadParam(const rapidjson::Value &doc) { } else { dispatch_interval_send_ = constants::kDispatchIntervalSend; } + + if (doc.HasMember("load_balance_interval") && + doc["load_balance_interval"].IsInt() && + doc["load_balance_interval"].GetInt() > 0) { + const rapidjson::Value &obj = doc["load_balance_interval"]; + load_balance_interval_ = obj.GetInt(); + } else { + load_balance_interval_ = constants::kLoadBalanceInterval; + } + + if (doc.HasMember("heart_beat_interval") && + doc["heart_beat_interval"].IsInt() && + doc["heart_beat_interval"].GetInt() > 0) { + const rapidjson::Value &obj = doc["heart_beat_interval"]; + heart_beat_interval_ = obj.GetInt(); + } else { + heart_beat_interval_ = constants::kHeartBeatInterval; + } } void SdkConfig::InitCacheParam(const rapidjson::Value &doc) { @@ -360,6 +382,14 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) { } else { tcp_idle_time_ = constants::kTcpIdleTime; } + + // enable balance + if (doc.HasMember("enable_balance") && doc["enable_balance"].IsBool()) { + const rapidjson::Value &obj = doc["enable_balance"]; + enable_balance_ = obj.GetBool(); + } else { + enable_balance_ = constants::kEnableBalance; + } } void SdkConfig::InitAuthParm(const rapidjson::Value &doc) { // auth settings diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc index fb91972318..4591953fa0 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.cc @@ -25,19 +25,26 @@ namespace inlong { const int kDefaultQueueSize = 20; -SendGroup::SendGroup(std::string group_id) - : work_(asio::make_work_guard(io_context_)), group_id_(group_id), - send_idx_(0) { +SendGroup::SendGroup(std::string send_group_key) + : work_(asio::make_work_guard(io_context_)), + send_group_key_(send_group_key), send_idx_(0), dispatch_stat_(0), + load_threshold_(0), max_proxy_num_(0) { max_send_queue_num_ = SdkConfig::getInstance()->send_buf_size_ / SdkConfig::getInstance()->pack_size_; if (max_send_queue_num_ <= 0) { max_send_queue_num_ = kDefaultQueueSize; } - LOG_INFO("SendGroup max_send_queue_num " << max_send_queue_num_); + LOG_INFO("SendGroup: " << send_group_key_ + << ", max send queue num: " << max_send_queue_num_); dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_; - tcp_clients_old_ = nullptr; - tcp_clients_ = std::make_shared<std::vector<TcpClientTPtrT>>(); - tcp_clients_->reserve(SdkConfig::getInstance()->max_proxy_num_); + load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_; + heart_beat_interval_ = + SdkConfig::getInstance()->heart_beat_interval_ / dispatch_interval_; + need_balance_ = SdkConfig::getInstance()->enable_balance_; + + work_clients_old_ = nullptr; + work_clients_ = std::make_shared<std::vector<TcpClientTPtrT>>(); + work_clients_->reserve(SdkConfig::getInstance()->max_proxy_num_); send_timer_ = std::make_shared<asio::steady_timer>(io_context_); send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_)); @@ -49,13 +56,29 @@ SendGroup::SendGroup(std::string group_id) update_conf_timer_->async_wait( std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); - current_proxy_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_); + if (SdkConfig::getInstance()->enable_balance_) { + load_balance_timer_ = std::make_shared<asio::steady_timer>(io_context_); + load_balance_timer_->expires_after( + std::chrono::milliseconds(load_balance_interval_)); + load_balance_timer_->async_wait( + std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); + } + + current_bus_vec_.reserve(SdkConfig::getInstance()->max_proxy_num_); thread_ = std::thread(&SendGroup::Run, this); } SendGroup::~SendGroup() { LOG_INFO("~SendGroup "); - send_timer_->cancel(); - update_conf_timer_->cancel(); + if (send_timer_) { + send_timer_->cancel(); + } + if (update_conf_timer_) { + update_conf_timer_->cancel(); + } + if (load_balance_timer_) { + update_conf_timer_->cancel(); + } + io_context_.stop(); if (thread_.joinable()) { thread_.join(); @@ -76,19 +99,18 @@ void SendGroup::DispatchData(std::error_code error) { return; } try { - unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_); - if (tcp_clients_ != nullptr) { - if (send_idx_ >= tcp_clients_->size()) { + unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); + if (work_clients_ != nullptr) { + if (send_idx_ >= work_clients_->size()) { send_idx_ = 0; } - - while (send_idx_ < tcp_clients_->size()) { - if ((*tcp_clients_)[send_idx_]->isFree()) { + while (send_idx_ < work_clients_->size()) { + if ((*work_clients_)[send_idx_]->isFree()) { SendBufferPtrT send_buf = PopData(); if (send_buf == nullptr) { break; } - (*tcp_clients_)[send_idx_]->write(send_buf); + (*work_clients_)[send_idx_]->write(send_buf); } send_idx_++; } @@ -96,10 +118,33 @@ void SendGroup::DispatchData(std::error_code error) { } catch (std::exception &e) { LOG_ERROR("Exception " << e.what()); } + + if (need_balance_ && dispatch_stat_++ > heart_beat_interval_) { + HeartBeat(); + dispatch_stat_ = 0; + } + send_timer_->expires_after(std::chrono::milliseconds(dispatch_interval_)); send_timer_->async_wait( std::bind(&SendGroup::DispatchData, this, std::placeholders::_1)); } +void SendGroup::HeartBeat() { + unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); + if (work_clients_ == nullptr) { + return; + } + for (int idx = 0; idx < work_clients_->size(); idx++) { + if ((*work_clients_)[idx]->isFree()) { + (*work_clients_)[idx]->HeartBeat(); + } else { + (*work_clients_)[idx]->SetHeartBeatStatus(); + } + } + + for (int idx = 0; idx < reserve_clients_.size(); idx++) { + reserve_clients_[idx]->HeartBeat(true); + } +} bool SendGroup::IsFull() { return GetQueueSize() > max_send_queue_num_; } @@ -120,30 +165,36 @@ void SendGroup::UpdateConf(std::error_code error) { ClearOldTcpClients(); - ProxyInfoVec new_proxy_info; - if (ProxyManager::GetInstance()->GetProxy(group_id_, new_proxy_info) != + ProxyInfoVec new_bus_info; + if (ProxyManager::GetInstance()->GetProxy(send_group_key_, new_bus_info) != kSuccess || - new_proxy_info.empty()) { + new_bus_info.empty()) { update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); update_conf_timer_->async_wait( std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); return; } + if (new_bus_info.empty()) { + LOG_INFO("UpdateConf new_bus_info is empty!"); + return; + } + + load_threshold_ = new_bus_info[0].GetLoad() > constants::kDefaultLoadThreshold + ? constants::kDefaultLoadThreshold + : std::max((new_bus_info[0].GetLoad()), 0); - if (!IsConfChanged(current_proxy_vec_, new_proxy_info)) { - LOG_INFO("Don`t need UpdateConf. current proxy size(" - << current_proxy_vec_.size() << ")=proxy size(" - << new_proxy_info.size() << ")"); + if (!IsConfChanged(current_bus_vec_, new_bus_info)) { + LOG_INFO("Don`t need UpdateConf. current bus size(" + << current_bus_vec_.size() << ")=bus size(" << new_bus_info.size() + << ")"); update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); update_conf_timer_->async_wait( std::bind(&SendGroup::UpdateConf, this, std::placeholders::_1)); return; } - uint32_t proxy_num = SdkConfig::getInstance()->max_proxy_num_; - if (proxy_num > new_proxy_info.size()) { - proxy_num = new_proxy_info.size(); - } + max_proxy_num_ = + std::min(SdkConfig::getInstance()->max_proxy_num_, new_bus_info.size()); std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_tmp = std::make_shared<std::vector<TcpClientTPtrT>>(); @@ -155,32 +206,33 @@ void SendGroup::UpdateConf(std::error_code error) { return; } - std::random_shuffle(new_proxy_info.begin(), new_proxy_info.end()); + std::random_shuffle(new_bus_info.begin(), new_bus_info.end()); - tcp_clients_tmp->reserve(proxy_num); - for (int i = 0; i < proxy_num; i++) { - ProxyInfo proxy_tmp = new_proxy_info[i]; - TcpClientTPtrT tcpClientTPtrT = std::make_shared<TcpClient>( - io_context_, proxy_tmp.ip(), proxy_tmp.port()); + tcp_clients_tmp->reserve(max_proxy_num_); + for (int i = 0; i < max_proxy_num_; i++) { + ProxyInfo bus_tmp = new_bus_info[i]; + TcpClientTPtrT tcpClientTPtrT = + std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port()); tcp_clients_tmp->push_back(tcpClientTPtrT); - LOG_INFO("new proxy info.[" << proxy_tmp.ip() << ":" << proxy_tmp.port() - << "]"); + LOG_INFO("new bus info.[" << bus_tmp.ip() << ":" << bus_tmp.port() << "]"); } { LOG_INFO("do change tcp clients."); - unique_write_lock<read_write_mutex> wtlck(remote_proxy_list_mutex_); - tcp_clients_old_ = tcp_clients_; - tcp_clients_ = tcp_clients_tmp; + unique_write_lock<read_write_mutex> wtlck(work_clients_mutex_); + work_clients_old_ = work_clients_; + work_clients_ = tcp_clients_tmp; } - if (tcp_clients_old_ != nullptr) { - for (int j = 0; j < tcp_clients_old_->size(); j++) { - (*tcp_clients_old_)[j]->DoClose(); + if (work_clients_old_ != nullptr) { + for (int j = 0; j < work_clients_old_->size(); j++) { + (*work_clients_old_)[j]->DoClose(); } } - current_proxy_vec_ = new_proxy_info; + current_bus_vec_ = new_bus_info; + + InitReserveClient(); update_conf_timer_->expires_after(std::chrono::milliseconds(kTimerMinute)); update_conf_timer_->async_wait( @@ -204,25 +256,25 @@ uint32_t SendGroup::GetQueueSize() { return send_buf_list_.size(); } -bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_proxy_vec, - ProxyInfoVec &new_proxy_vec) { - if (new_proxy_vec.empty()) +bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_bus_vec, + ProxyInfoVec &new_bus_vec) { + if (new_bus_vec.empty()) return false; - if (current_proxy_vec.size() != new_proxy_vec.size()) { + if (current_bus_vec.size() != new_bus_vec.size()) { return true; } - for (auto ¤t_bu : current_proxy_vec) { - for (int i = 0; i < new_proxy_vec.size(); i++) { - if ((current_bu.ip() == new_proxy_vec[i].ip()) && - (current_bu.port() == new_proxy_vec[i].port())) + for (auto ¤t_bu : current_bus_vec) { + for (int i = 0; i < new_bus_vec.size(); i++) { + if ((current_bu.ip() == new_bus_vec[i].ip()) && + (current_bu.port() == new_bus_vec[i].port())) break; - if (i == (new_proxy_vec.size() - 1)) { - if ((current_bu.ip() != new_proxy_vec[i].ip() || - current_bu.port() == new_proxy_vec[i].port())) { - LOG_INFO("current proxy ip." << current_bu.ip() << ":" - << current_bu.port() - << " can`t find in proxy."); + if (i == (new_bus_vec.size() - 1)) { + if ((current_bu.ip() != new_bus_vec[i].ip() || + current_bu.port() == new_bus_vec[i].port())) { + LOG_INFO("current bus ip." << current_bu.ip() << ":" + << current_bu.port() + << " can`t find in bus."); return true; } } @@ -232,20 +284,188 @@ bool SendGroup::IsConfChanged(ProxyInfoVec ¤t_proxy_vec, } bool SendGroup::IsAvailable() { - unique_read_lock<read_write_mutex> rdlck(remote_proxy_list_mutex_); - if (tcp_clients_ == nullptr) { + unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); + if (work_clients_ == nullptr) { return false; } - if (tcp_clients_->empty()) { + if (work_clients_->empty()) { return false; } return true; } void SendGroup::ClearOldTcpClients() { - if (tcp_clients_old_ != nullptr) { - LOG_INFO("ClearOldTcpClients." << tcp_clients_old_->size()); - tcp_clients_old_->clear(); - tcp_clients_old_.reset(); + if (work_clients_old_ != nullptr) { + LOG_INFO("ClearOldTcpClients." << work_clients_old_->size()); + work_clients_old_->clear(); + work_clients_old_.reset(); } } + +void SendGroup::LoadBalance(std::error_code error) { + if (error) { + return; + } + + if (NeedDoLoadBalance()) { + DoLoadBalance(); + } + uint64_t interval = load_balance_interval_ + rand() % 120 * 1000; + LOG_INFO("LoadBalance interval:" << interval); + load_balance_timer_->expires_after(std::chrono::milliseconds(interval)); + load_balance_timer_->async_wait( + std::bind(&SendGroup::LoadBalance, this, std::placeholders::_1)); +} + +void SendGroup::DoLoadBalance() { + if (reserve_clients_.empty()) { + return; + } + + TcpClientTPtrT work_client = GetMaxLoadClient(); + TcpClientTPtrT reserve_client = GetReserveClient(); + if (reserve_client == nullptr || work_client == nullptr) { + LOG_ERROR("client nullptr"); + return; + } + + if ((work_client->GetAvgLoad() - reserve_client->GetAvgLoad()) > + load_threshold_) { + LOG_INFO("DoLoadBalance " << reserve_client->getClientInfo() << "replace" + << work_client->getClientInfo() << ",load[work " + << work_client->GetAvgLoad() << "][reserve " + << reserve_client->GetAvgLoad() << "][threshold " + << load_threshold_ << "]"); + std::string ip = work_client->getIp(); + uint32_t port = work_client->getPort(); + work_client->UpdateClient(reserve_client->getIp(), + reserve_client->getPort()); + + ProxyInfo proxy = GetRandomProxy(ip, port); + if (!proxy.ip().empty()) { + reserve_client->UpdateClient(proxy.ip(), proxy.port()); + } + } + + need_balance_ = true; +} +bool SendGroup::NeedDoLoadBalance() { + unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); + if (load_threshold_ <= 0 || + work_clients_->size() == current_bus_vec_.size()) { + LOG_INFO("Don`t need DoLoadBalance [load_threshold]:" + << load_threshold_ + << ",[tcp_client size]:" << work_clients_->size() + << ",[current_bus_vec size]:" << current_bus_vec_.size()); + need_balance_ = false; + return false; + } + need_balance_ = true; + return true; +} +void SendGroup::InitReserveClient() { + if (max_proxy_num_ >= current_bus_vec_.size()) { + return; + } + uint64_t max_reserve_num = current_bus_vec_.size() - max_proxy_num_; + uint64_t reserve_num = + std::min(SdkConfig::getInstance()->reserve_proxy_num_, max_reserve_num); + if (reserve_num <= 0) { + return; + } + + unique_write_lock<read_write_mutex> wtlck(reserve_clients_mutex_); + reserve_clients_.clear(); + + for (uint64_t i = current_bus_vec_.size() - reserve_num; + i < current_bus_vec_.size(); i++) { + ProxyInfo bus_tmp = current_bus_vec_[i]; + TcpClientTPtrT tcpClientTPtrT = + std::make_shared<TcpClient>(io_context_, bus_tmp.ip(), bus_tmp.port()); + reserve_clients_.push_back(tcpClientTPtrT); + } + LOG_INFO( + "InitReserveClient reserve_clients size:" << reserve_clients_.size()); +} +bool SendGroup::UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end) { + if (begin && end) { + return begin->GetAvgLoad() < end->GetAvgLoad(); + } + return false; +} + +TcpClientTPtrT SendGroup::GetMaxLoadClient() { + unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); + uint32_t client_index = 0; + int32_t max_load = (*work_clients_)[0]->GetAvgLoad(); + for (int index = 1; index < work_clients_->size(); index++) { + int32_t proxy_load = (*work_clients_)[index]->GetAvgLoad(); + if (proxy_load > max_load) { + max_load = proxy_load; + client_index = index; + } + } + return (*work_clients_)[client_index]; +} + +ProxyInfo SendGroup::GetRandomProxy(const std::string &ip, uint32_t port) { + ProxyInfo proxy_info; + for (auto &it : current_bus_vec_) { + if (it.ip() == ip && it.port() == port) { + continue; + } + bool exist = false; + for (int index = 0; index < reserve_clients_.size(); index++) { + if (it.ip() == reserve_clients_[index]->getIp() && + it.port() == reserve_clients_[index]->getPort()) { + exist = true; + break; + } + } + if (exist) { + continue; + } + if (ExistInWorkClient(it.ip(), it.port())) { + continue; + } + proxy_info.setIp(it.ip()); + proxy_info.setPort(it.port()); + return proxy_info; + } + return proxy_info; +} + +TcpClientTPtrT SendGroup::GetReserveClient() { + std::sort(reserve_clients_.begin(), reserve_clients_.end(), UpSort); + unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); + ProxyInfo proxy_info; + for (auto &it : reserve_clients_) { + if (it->GetAvgLoad() <= 0) { + continue; + } + bool exist = false; + for (int index = 0; index < work_clients_->size(); index++) { + if (it->getIp() == (*work_clients_)[index]->getIp() && + it->getPort() == (*work_clients_)[index]->getPort()) { + exist = true; + break; + } + } + if (exist) { + continue; + } + return it; + } + return nullptr; +} + +bool SendGroup::ExistInWorkClient(const std::string &ip, uint32_t port) { + unique_read_lock<read_write_mutex> rdlck(work_clients_mutex_); + for (int index = 0; index < work_clients_->size(); index++) { + if (ip == (*work_clients_)[index]->getIp() && + port == (*work_clients_)[index]->getPort()) { + return true; + } + } + return false; +} } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h index 88dc90771c..a52d63dcd3 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/send_group.h @@ -39,27 +39,37 @@ private: io_context_work work_; std::thread thread_; void Run(); + uint64_t max_proxy_num_; public: - std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_; - std::shared_ptr<std::vector<TcpClientTPtrT>> tcp_clients_old_; + std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_; + std::shared_ptr<std::vector<TcpClientTPtrT>> work_clients_old_; + std::vector<TcpClientTPtrT> reserve_clients_; - ProxyInfoVec current_proxy_vec_; - TcpClientTPtrVecItT current_client_; + ProxyInfoVec current_bus_vec_; std::queue<SendBufferPtrT> send_buf_list_; SteadyTimerPtr send_timer_; SteadyTimerPtr update_conf_timer_; + SteadyTimerPtr load_balance_timer_; read_write_mutex send_group_mutex_; - read_write_mutex remote_proxy_list_mutex_; + read_write_mutex work_clients_mutex_; + read_write_mutex reserve_clients_mutex_; std::mutex mutex_; - std::string group_id_; + std::string send_group_key_; std::uint32_t send_idx_; uint32_t max_send_queue_num_; - SendGroup(std::string group_id); + uint32_t dispatch_interval_; + uint32_t heart_beat_interval_; + uint32_t load_balance_interval_; + uint32_t dispatch_stat_; + volatile bool need_balance_; + volatile int32_t load_threshold_; + + SendGroup(std::string send_group_key); ~SendGroup(); void PreDispatchData(std::error_code error); @@ -69,14 +79,24 @@ public: SendBufferPtrT PopData(); uint32_t GetQueueSize(); void UpdateConf(std::error_code error); - bool IsConfChanged(ProxyInfoVec ¤t_proxy_vec, - ProxyInfoVec &new_proxy_vec); + bool IsConfChanged(ProxyInfoVec ¤t_bus_vec, ProxyInfoVec &new_bus_vec); bool IsAvailable(); - uint32_t dispatch_interval_; - void ClearOldTcpClients(); + + // balance + bool NeedDoLoadBalance(); + void LoadBalance(std::error_code error); + void DoLoadBalance(); + void HeartBeat(); + ProxyInfo GetRandomProxy(const std::string &ip = "", uint32_t port = 0); + TcpClientTPtrT GetReserveClient(); + TcpClientTPtrT GetMaxLoadClient(); + void InitReserveClient(); + static bool UpSort(const TcpClientTPtrT &begin, const TcpClientTPtrT &end); + bool ExistInWorkClient(const std::string &ip, uint32_t port); }; using SendGroupPtr = std::shared_ptr<SendGroup>; + } // namespace inlong #endif // INLONG_SDK_SEND_GROUP_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index abe33dde21..f2e887d29c 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -181,6 +181,17 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id, } groupid_2_cluster_id_update_map_[inlong_group_id] = clusterInfo["clusterId"].GetInt(); + + // check load + int32_t load = 0; + if (clusterInfo.HasMember("load") && clusterInfo["load"].IsInt() && + !clusterInfo["load"].IsNull()) { + const rapidjson::Value &obj = clusterInfo["load"]; + load = obj.GetInt(); + } else { + load = 0; + } + // proxy list for (auto &proxy : nodeList.GetArray()) { std::string ip; @@ -212,7 +223,7 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id, LOG_WARN("there is no id info of inlong_group_id"); continue; } - proxy_info_vec.emplace_back(id, ip, port); + proxy_info_vec.emplace_back(id, ip, port, load); } return SdkCode::kSuccess; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index b2af191112..eb17f56313 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -43,6 +43,9 @@ static const uint32_t kMaxStreamIdNum = 100; static const int32_t kDispatchIntervalZip = 8; static const int32_t kDispatchIntervalSend = 10; +static const int32_t kLoadBalanceInterval = 300000; +static const int32_t kHeartBeatInterval = 60000; +static const bool kEnableBalance = true; static const bool kEnablePack = true; static const uint32_t kPackSize = 409600; @@ -66,6 +69,7 @@ static const char kManagerClusterURL[] = static const uint32_t kManagerUpdateInterval = 2; static const uint32_t kManagerTimeout = 5; static const uint32_t kMaxProxyNum = 8; +static const uint32_t kReserveProxyNum = 2; static const bool kEnableTCPNagle = true; static const uint32_t kTcpIdleTime = 600000; @@ -91,6 +95,12 @@ static const uint32_t kMaxAttrLen = 2048; const uint32_t ATTR_LENGTH = 10; static const bool kEnableIsolation = false; +static const int32_t kDefaultLoadThreshold = 200; +const uint32_t MAX_STAT = 10000000; +static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2, 2, 2, 2, 2, + 3, 3, 3, 3, 3, 6, 6, 6, 6, 6, + 12, 12, 12, 12, 12, 48, 96, 192, 384, 1000}; + } // namespace constants } // namespace inlong #endif // INLONG_SDK_CONSTANT_H \ No newline at end of file