This is an automated email from the ASF dual-hosted git repository. doleyzi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new fb395fe0e0 [INLONG-10821][SDK] Optimize the ability to receive data for DataProxy C++ SDK (#10835) fb395fe0e0 is described below commit fb395fe0e0a44c1ade30f084c138aeaf7232e2bf Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Wed Aug 21 14:01:34 2024 +0800 [INLONG-10821][SDK] Optimize the ability to receive data for DataProxy C++ SDK (#10835) --- .../dataproxy-sdk-cpp/src/client/tcp_client.cc | 246 +++++++------- .../dataproxy-sdk-cpp/src/client/tcp_client.h | 71 ++-- .../dataproxy-sdk-cpp/src/config/sdk_conf.cc | 7 + .../dataproxy-sdk-cpp/src/config/sdk_conf.h | 1 + .../dataproxy-sdk-cpp/src/core/api_code.h | 3 +- .../dataproxy-sdk-cpp/src/core/api_imp.cc | 104 +++--- .../dataproxy-sdk-cpp/src/core/api_imp.h | 38 +-- .../dataproxy-sdk-cpp/src/core/sdk_msg.h | 11 +- .../dataproxy-sdk-cpp/src/group/recv_group.cc | 365 +++++++++++---------- .../dataproxy-sdk-cpp/src/group/recv_group.h | 77 +++-- .../dataproxy-sdk-cpp/src/manager/proxy_manager.cc | 6 +- .../dataproxy-sdk-cpp/src/manager/proxy_manager.h | 2 +- .../dataproxy-sdk-cpp/src/utils/capi_constant.h | 5 + .../dataproxy-sdk-cpp/src/utils/send_buffer.h | 4 +- 14 files changed, 511 insertions(+), 429 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc index e27910c4e9..eb3264ba6c 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.cc @@ -1,47 +1,55 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include "tcp_client.h" -#include "../utils/utils.h" -#include "api_code.h" + #include <utility> +#include "../manager/buffer_manager.h" +#include "../manager/metric_manager.h" +#include "../utils/utils.h" + namespace inlong { #define CLIENT_INFO client_info_ << "[" << status_ << "]" TcpClient::TcpClient(IOContext &io_context, std::string ip, uint32_t port) : socket_(std::make_shared<asio::ip::tcp::socket>(io_context)), wait_timer_(std::make_shared<asio::steady_timer>(io_context)), keep_alive_timer_(std::make_shared<asio::steady_timer>(io_context)), - ip_(ip), port_(port), endpoint_(asio::ip::address::from_string(ip), port), - status_(kUndefined), recv_buf_(new BlockMemory()), exit_(false), - proxy_loads_(30), wait_heart_beat_(false), reset_client_(false), - heart_beat_index_(0), only_heart_heat_(false) { + ip_(ip), + port_(port), + endpoint_(asio::ip::address::from_string(ip), port), + status_(kUndefined), + recv_buf_(new BlockMemory()), + exit_(false), + proxy_loads_(30), + wait_heart_beat_(false), + reset_client_(false), + heart_beat_index_(0), + only_heart_heat_(false), + need_retry_(false), + retry_times_(0) { client_info_ = " [" + ip_ + ":" + std::to_string(port_) + "]"; tcp_detection_interval_ = SdkConfig::getInstance()->tcp_detection_interval_; tcp_idle_time_ = SdkConfig::getInstance()->tcp_idle_time_; last_update_time_ = Utils::getCurrentMsTime(); - keep_alive_timer_->expires_after( - std::chrono::milliseconds(tcp_detection_interval_)); - keep_alive_timer_->async_wait( - std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); + keep_alive_timer_->expires_after(std::chrono::milliseconds(tcp_detection_interval_)); + keep_alive_timer_->async_wait(std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); LOG_INFO("TcpClient At remote info .status:" << status_ << client_info_); AsyncConnect(); @@ -70,7 +78,7 @@ TcpClient::~TcpClient() { void TcpClient::DoClose() { status_ = kStopped; exit_ = true; - LOG_INFO("closed client." << CLIENT_INFO); + LOG_INFO("Closed client." << CLIENT_INFO); } void TcpClient::AsyncConnect() { @@ -87,13 +95,12 @@ void TcpClient::AsyncConnect() { } } status_ = kConnecting; - LOG_INFO("began to connect." << CLIENT_INFO); + LOG_INFO("Began to connect." << CLIENT_INFO); } catch (std::exception &e) { LOG_ERROR("AsyncConnect exception." << e.what() << CLIENT_INFO); } - socket_->async_connect(endpoint_, std::bind(&TcpClient::OnConnected, this, - std::placeholders::_1)); + socket_->async_connect(endpoint_, std::bind(&TcpClient::OnConnected, this, std::placeholders::_1)); } void TcpClient::DoAsyncConnect(asio::error_code error) { @@ -116,7 +123,12 @@ void TcpClient::OnConnected(asio::error_code error) { socket_->set_option(asio::ip::tcp::no_delay(true)); asio::socket_base::keep_alive option(true); socket_->set_option(option); - LOG_INFO("client has connected." << CLIENT_INFO); + LOG_INFO("Client has connected." << CLIENT_INFO); + if (need_retry_) { + LOG_WARN("Client has connected retry! times:" << retry_times_ << CLIENT_INFO); + write(sendBuffer_, true); + return; + } status_ = kFree; return; } @@ -124,22 +136,24 @@ void TcpClient::OnConnected(asio::error_code error) { return; } status_ = kConnectFailed; - LOG_ERROR("connect has error:" << error.message() << CLIENT_INFO); + LOG_ERROR("Connect has error:" << error.message() << CLIENT_INFO); + if (need_retry_) { + ResetSendBuffer(); + } wait_timer_->expires_after(std::chrono::milliseconds(kConnectTimeout)); - wait_timer_->async_wait( - std::bind(&TcpClient::DoAsyncConnect, this, std::placeholders::_1)); + wait_timer_->async_wait(std::bind(&TcpClient::DoAsyncConnect, this, std::placeholders::_1)); } -void TcpClient::write(SendBufferPtrT sendBuffer) { +void TcpClient::write(SendBufferPtrT sendBuffer, bool retry) { if (kStopped == status_ || exit_) { LOG_ERROR("Stop.At." << CLIENT_INFO); return; } - if (status_ != kFree) { + if (status_ != kFree && !retry) { LOG_WARN("Not free ." << CLIENT_INFO); return; } - sendBuffer_ = sendBuffer; + sendBuffer_ = std::move(sendBuffer); BeginWrite(); } @@ -150,13 +164,10 @@ void TcpClient::BeginWrite() { } last_update_time_ = Utils::getCurrentMsTime(); status_ = kWriting; - asio::async_write(*socket_, - asio::buffer(sendBuffer_->content(), sendBuffer_->len()), - std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, - std::placeholders::_2)); + asio::async_write(*socket_, asio::buffer(sendBuffer_->content(), sendBuffer_->len()), + std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, std::placeholders::_2)); } -void TcpClient::OnWroten(const asio::error_code error, - std::size_t bytes_transferred) { +void TcpClient::OnWroten(const asio::error_code error, std::size_t bytes_transferred) { if (kStopped == status_ || exit_) { return; } @@ -164,14 +175,14 @@ void TcpClient::OnWroten(const asio::error_code error, if (asio::error::operation_aborted == error) { return; } - LOG_ERROR("write error:" << error.message() << CLIENT_INFO); + LOG_ERROR("Write error:" << error.message() << CLIENT_INFO); status_ = kWriting; HandleFail(); return; } if (0 == bytes_transferred) { - LOG_ERROR("transferred 0 bytes." << CLIENT_INFO); + LOG_ERROR("Transferred 0 bytes." << CLIENT_INFO); status_ = kWaiting; HandleFail(); return; @@ -179,8 +190,7 @@ void TcpClient::OnWroten(const asio::error_code error, status_ = kClientResponse; asio::async_read(*socket_, asio::buffer(recv_buf_->m_data, sizeof(uint32_t)), - std::bind(&TcpClient::OnReturn, this, std::placeholders::_1, - std::placeholders::_2)); + std::bind(&TcpClient::OnReturn, this, std::placeholders::_1, std::placeholders::_2)); } void TcpClient::OnReturn(asio::error_code error, std::size_t len) { if (kStopped == status_ || exit_) { @@ -201,18 +211,15 @@ void TcpClient::OnReturn(asio::error_code error, std::size_t len) { HandleFail(); return; } - size_t resp_len = - ntohl(*reinterpret_cast<const uint32_t *>(recv_buf_->m_data)); + size_t resp_len = ntohl(*reinterpret_cast<const uint32_t *>(recv_buf_->m_data)); if (resp_len > recv_buf_->m_max_size) { status_ = kWaiting; HandleFail(); return; } - asio::async_read(*socket_, - asio::buffer(recv_buf_->m_data + sizeof(uint32_t), resp_len), - std::bind(&TcpClient::OnBody, this, std::placeholders::_1, - std::placeholders::_2)); + asio::async_read(*socket_, asio::buffer(recv_buf_->m_data + sizeof(uint32_t), resp_len), + std::bind(&TcpClient::OnBody, this, std::placeholders::_1, std::placeholders::_2)); } void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) { @@ -230,17 +237,19 @@ void TcpClient::OnBody(asio::error_code error, size_t bytesTransferred) { return; } uint32_t parse_index = sizeof(uint32_t); - uint8_t msg_type = - *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index); + uint8_t msg_type = *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index); switch (msg_type) { - case 8: - ParseHeartBeat(bytesTransferred); - break; - default: - ParseGenericResponse(); - break; + case 8: + ParseHeartBeat(bytesTransferred); + break; + default: + ParseGenericResponse(); + break; } + + ResetRetry(); + if (wait_heart_beat_) { HeartBeat(); wait_heart_beat_ = false; @@ -261,17 +270,12 @@ void TcpClient::HandleFail() { } status_ = kConnecting; - if (sendBuffer_ != nullptr) { - stat_.AddSendFailMsgNum(sendBuffer_->msgCnt()); - stat_.AddSendFailPackNum(1); + ResetSendBuffer(); - stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); + int retry_interval = std::min(retry_times_ * constants::kRetryIntervalMs, constants::kMaxRetryIntervalMs); - sendBuffer_->doUserCallBack(); - sendBuffer_->releaseBuf(); - } - - AsyncConnect(); + wait_timer_->expires_after(std::chrono::milliseconds(retry_interval)); + wait_timer_->async_wait(std::bind(&TcpClient::DoAsyncConnect, this, std::placeholders::_1)); } void TcpClient::DetectStatus(const asio::error_code error) { @@ -282,23 +286,28 @@ void TcpClient::DetectStatus(const asio::error_code error) { return; } if (!only_heart_heat_) { - LOG_INFO(stat_.ToString() << CLIENT_INFO); - stat_.ResetStat(); + UpdateMetric(); } - if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ && - status_ != kConnecting) { + if ((Utils::getCurrentMsTime() - last_update_time_) > tcp_idle_time_ && status_ != kConnecting) { std::fill(proxy_loads_.begin(), proxy_loads_.end(), 0); - LOG_INFO("reconnect because it has idle " - << tcp_idle_time_ << " ms." - << "last send time:" << last_update_time_ << CLIENT_INFO); + LOG_INFO("Reconnect because it has idle " << tcp_idle_time_ << " ms." + << "last send time:" << last_update_time_ << CLIENT_INFO); AsyncConnect(); } - keep_alive_timer_->expires_after( - std::chrono::milliseconds(tcp_detection_interval_)); - keep_alive_timer_->async_wait( - std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); + keep_alive_timer_->expires_after(std::chrono::milliseconds(tcp_detection_interval_)); + keep_alive_timer_->async_wait(std::bind(&TcpClient::DetectStatus, this, std::placeholders::_1)); +} + +void TcpClient::UpdateMetric() { + Metric stat; + for (auto &it : stat_map_) { + MetricManager::GetInstance()->UpdateMetric(it.first, it.second); + stat.Update(it.second); + it.second.ResetStat(); + } + LOG_INFO(stat.ToString() << CLIENT_INFO); } void TcpClient::HeartBeat(bool only_heart_heat) { @@ -308,12 +317,10 @@ void TcpClient::HeartBeat(bool only_heart_heat) { only_heart_heat_ = only_heart_heat; status_ = kHeartBeat; last_update_time_ = Utils::getCurrentMsTime(); - // status_ = kWriting; bin_hb_.total_len = htonl(sizeof(BinaryHB) - 4); bin_hb_.msg_type = 8; - bin_hb_.data_time = - htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000)); + bin_hb_.data_time = htonl(static_cast<uint32_t>(Utils::getCurrentMsTime() / 1000)); bin_hb_.body_ver = 1; bin_hb_.body_len = 0; bin_hb_.attr_len = 0; @@ -322,58 +329,52 @@ void TcpClient::HeartBeat(bool only_heart_heat) { uint32_t hb_len = sizeof(bin_hb_); asio::async_write(*socket_, asio::buffer(hb, hb_len), - std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, - std::placeholders::_2)); + std::bind(&TcpClient::OnWroten, this, std::placeholders::_1, std::placeholders::_2)); } void TcpClient::ParseHeartBeat(size_t total_length) { - // | total length(4) | msg type(1) | data time(4) | body version(1) | body - // length (4) | body | attr length(2) | attr | magic (2) | skip total length + // | total length(4) | msg type(1) | data time(4) | body version(1) | body length (4) | body | attr length(2) | attr + // | magic (2) | + // skip total length uint32_t parse_index = sizeof(uint32_t); // skip msg type parse_index += sizeof(uint8_t); // skip data time - // uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t - // *>(recv_buf_->m_data + parse_index)); + // uint32_t data_time = ntohl(*reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index)); parse_index += sizeof(uint32_t); // 3、parse body version - uint32_t body_version = - *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index); + uint32_t body_version = *reinterpret_cast<const uint8_t *>(recv_buf_->m_data + parse_index); parse_index += sizeof(uint8_t); // 4、parse body length - uint32_t body_length = ntohl( - *reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index)); + uint32_t body_length = ntohl(*reinterpret_cast<const uint32_t *>(recv_buf_->m_data + parse_index)); parse_index += sizeof(uint32_t); // 5 parse load - int16_t load = ntohs( - *reinterpret_cast<const int16_t *>(recv_buf_->m_data + parse_index)); + int16_t load = ntohs(*reinterpret_cast<const int16_t *>(recv_buf_->m_data + parse_index)); parse_index += sizeof(int16_t); // 7 parse attr length - uint16_t attr_length = ntohs( - *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index)); + uint16_t attr_length = ntohs(*reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index)); parse_index += sizeof(uint16_t); // 8 skip attr parse_index += attr_length; // 9 parse magic - uint16_t magic = ntohs( - *reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index)); + uint16_t magic = ntohs(*reinterpret_cast<const uint16_t *>(recv_buf_->m_data + parse_index)); parse_index += sizeof(uint16_t); if (magic != constants::kBinaryMagic) { - LOG_ERROR("failed to parse heartbeat ack! error magic " - << magic << " !=" << constants::kBinaryMagic << CLIENT_INFO); + LOG_ERROR("Failed to ParseMsg heartbeat ack! error magic " << magic << " !=" << constants::kBinaryMagic + << CLIENT_INFO); return; } if (total_length + 4 != parse_index) { - LOG_ERROR("failed to parse heartbeat ack! total_length " - << total_length << " +4 !=" << parse_index << CLIENT_INFO); + LOG_ERROR("Failed to ParseMsg heartbeat ack! total_length " << total_length << " +4 !=" << parse_index + << CLIENT_INFO); return; } if (heart_beat_index_ > constants::MAX_STAT) { @@ -389,12 +390,12 @@ void TcpClient::ParseHeartBeat(size_t total_length) { void TcpClient::ParseGenericResponse() { if (sendBuffer_ != nullptr) { - stat_.AddSendSuccessMsgNum(sendBuffer_->msgCnt()); - stat_.AddSendSuccessPackNum(1); - - stat_.AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); + std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + sendBuffer_->getStreamId(); + stat_map_[stat_key].AddSendSuccessMsgNum(sendBuffer_->msgCnt()); + stat_map_[stat_key].AddSendSuccessPackNum(1); + stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); - sendBuffer_->releaseBuf(); + BufferManager::GetInstance()->AddSendBuffer(sendBuffer_); } } @@ -419,8 +420,7 @@ int32_t TcpClient::GetAvgLoad() { void TcpClient::SetHeartBeatStatus() { wait_heart_beat_ = true; } void TcpClient::UpdateClient(const std::string ip, const uint32_t port) { - LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port - << "] replace" << CLIENT_INFO); + LOG_INFO("UpdateClient[" << only_heart_heat_ << "][" << ip << ":" << port << "] replace" << CLIENT_INFO); ip_ = ip; port_ = port; reset_client_ = true; @@ -438,5 +438,29 @@ void TcpClient::RestClient() { const std::string &TcpClient::getIp() const { return ip_; } const std::string &TcpClient::getClientInfo() const { return client_info_; } uint32_t TcpClient::getPort() const { return port_; } +void TcpClient::ResetRetry() { + need_retry_ = false; + retry_times_ = 0; +} +void TcpClient::ResetSendBuffer() { + if (sendBuffer_ == nullptr) { + return; + } + retry_times_++; + if (retry_times_ > SdkConfig::getInstance()->retry_times_) { + std::string stat_key = sendBuffer_->getGroupId() + kStatJoiner + sendBuffer_->getStreamId(); + stat_map_[stat_key].AddSendFailMsgNum(sendBuffer_->msgCnt()); + stat_map_[stat_key].AddSendFailPackNum(1); + stat_map_[stat_key].AddTimeCost(Utils::getCurrentMsTime() - last_update_time_); + + sendBuffer_->doUserCallBack(); + + BufferManager::GetInstance()->AddSendBuffer(sendBuffer_); -} // namespace inlong + LOG_INFO("resend to proxy fail! retry times:" << retry_times_ << CLIENT_INFO); + ResetRetry(); + } else { + need_retry_ = true; + } +} +} // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h index 6835a7fd26..f2a92fc459 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/client/tcp_client.h @@ -1,32 +1,32 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -#ifndef INLONG_SDK_TCP_CLIENT_H -#define INLONG_SDK_TCP_CLIENT_H +#ifndef INLONG_TCP_CLIENT_H +#define INLONG_TCP_CLIENT_H +#include <queue> +#include <unordered_map> + +#include "../metric/metric.h" +#include "../protocol/msg_protocol.h" #include "../utils/block_memory.h" #include "../utils/capi_constant.h" #include "../utils/read_write_mutex.h" #include "../utils/send_buffer.h" -#include "msg_protocol.h" -#include "stat.h" -#include <queue> namespace inlong { enum ClientStatus { @@ -46,37 +46,38 @@ enum { }; using IOContext = asio::io_context; using TcpSocketPtr = std::shared_ptr<asio::ip::tcp::socket>; +using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>; class TcpClient { -private: + private: TcpSocketPtr socket_; SteadyTimerPtr wait_timer_; SteadyTimerPtr keep_alive_timer_; ClientStatus status_; std::string ip_; -public: + public: const std::string &getIp() const; -private: + private: uint32_t port_; -public: + public: uint32_t getPort() const; -private: + private: std::string client_info_; -public: + public: const std::string &getClientInfo() const; -private: + private: std::shared_ptr<SendBuffer> sendBuffer_; asio::ip::tcp::endpoint endpoint_; BlockMemoryPtrT recv_buf_; uint64_t tcp_idle_time_; uint32_t tcp_detection_interval_; uint64_t last_update_time_; - Stat stat_; + Metric stat_; bool exit_; BinaryHB bin_hb_ = {0}; std::vector<int32_t> proxy_loads_; @@ -84,8 +85,11 @@ private: bool wait_heart_beat_; bool reset_client_; volatile bool only_heart_heat_; + bool need_retry_; + uint32_t retry_times_; + std::unordered_map <std::string ,Metric> stat_map_; -public: + public: TcpClient(IOContext &io_context, std::string ip, uint32_t port); ~TcpClient(); void AsyncConnect(); @@ -98,7 +102,7 @@ public: void DoClose(); void HandleFail(); bool isFree() { return (status_ == kFree); } - void write(SendBufferPtrT sendBuffer); + void write(SendBufferPtrT sendBuffer, bool retry = false); void DetectStatus(const asio::error_code error); void HeartBeat(bool only_heart_heat = false); void SetHeartBeatStatus(); @@ -107,10 +111,13 @@ public: void UpdateClient(const std::string ip, const uint32_t port); void RestClient(); int32_t GetAvgLoad(); + void ResetRetry(); + void ResetSendBuffer(); + void UpdateMetric(); }; typedef std::shared_ptr<TcpClient> TcpClientTPtrT; typedef std::vector<TcpClientTPtrT> TcpClientTPtrVecT; typedef TcpClientTPtrVecT::iterator TcpClientTPtrVecItT; -} // namespace inlong +} // namespace inlong -#endif // INLONG_SDK_TCP_CLIENT_H +#endif // INLONG_TCP_CLIENT_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc index 4240a836dc..12a82b07e7 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.cc @@ -436,6 +436,13 @@ void SdkConfig::InitTcpParam(const rapidjson::Value &doc) { } else { enable_balance_ = constants::kEnableBalance; } + + if (doc.HasMember("retry_times") && doc["retry_times"].IsInt() && doc["retry_times"].GetInt() > 0) { + const rapidjson::Value &obj = doc["retry_times"]; + retry_times_ = obj.GetInt(); + } else { + retry_times_ = constants::kRetryTimes; + } } void SdkConfig::InitAuthParm(const rapidjson::Value &doc) { // auth settings diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h index 3a649cfb82..ae6cfde58b 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/config/sdk_conf.h @@ -95,6 +95,7 @@ private: uint32_t tcp_detection_interval_; // tcp-client detection interval bool enable_balance_; bool enable_local_cache_; + uint32_t retry_times_; // auth settings diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h index 0228f9c96d..f6413717dd 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_code.h @@ -41,7 +41,8 @@ enum SdkCode { kSendBeforeInit = 22, kFailMallocBuf = 23, kMsgSizeLargerThanPackSize = 24, - kSendBufferFull = 25 + kSendBufferFull = 25, + kBufferManagerFull = 26 }; } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc index 13ce7223b5..7ae2c79ea2 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc @@ -16,19 +16,20 @@ */ #include "api_imp.h" +#include <signal.h> +#include <iostream> #include "../manager/proxy_manager.h" +#include "../manager/metric_manager.h" #include "../utils/capi_constant.h" #include "../utils/logger.h" #include "../utils/utils.h" - -#include "api_code.h" -#include <iostream> -#include <signal.h> - -#include "metric_manager.h" +#include "../core/api_code.h" namespace inlong { int32_t ApiImp::InitApi(const char *config_file_path) { + if (config_file_path == nullptr) { + return SdkCode::kErrorInit; + } if (!__sync_bool_compare_and_swap(&inited_, false, true)) { return SdkCode::kMultiInit; } @@ -36,56 +37,65 @@ int32_t ApiImp::InitApi(const char *config_file_path) { user_exit_flag_.getAndSet(0); if (!SdkConfig::getInstance()->ParseConfig(config_file_path)) { - LOG_ERROR("ParseConfig error "); return SdkCode::kErrorInit; } - max_msg_length_ = std::min(SdkConfig::getInstance()->max_msg_size_, - SdkConfig::getInstance()->pack_size_) - constants::ATTR_LENGTH; + + max_msg_length_ = SdkConfig::getInstance()->max_msg_size_ - constants::ATTR_LENGTH; local_ip_ = SdkConfig::getInstance()->local_ip_; return DoInit(); } -int32_t ApiImp::Send(const char *inlong_group_id, const char *inlong_stream_id, - const char *msg, int32_t msg_len, UserCallBack call_back) { +int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len, + UserCallBack call_back) { + int32_t code=ValidateParams(group_id, stream_id, msg, msg_len); + if(code !=SdkCode::kSuccess){ + return code; + } + + return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back); +} +int32_t ApiImp::Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len, + int64_t data_time, UserCallBack call_back) { + int32_t code=ValidateParams(group_id, stream_id, msg, msg_len); + if(code !=SdkCode::kSuccess){ + return code; + } + + return this->SendBase(group_id, stream_id, {msg, msg_len}, call_back, data_time); +} + +int32_t ApiImp::ValidateParams(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len){ if (msg_len > max_msg_length_) { + MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1); return SdkCode::kMsgTooLong; } - if (inlong_group_id == nullptr || inlong_stream_id == nullptr || - msg == nullptr || msg_len <= 0) { + if (group_id == nullptr || stream_id == nullptr || msg == nullptr || msg_len <= 0) { return SdkCode::kInvalidInput; } if (inited_ == false) { return SdkCode::kSendBeforeInit; } - - int64_t msg_time = Utils::getCurrentMsTime(); - return this->SendBase(inlong_group_id, inlong_stream_id, local_ip_, msg_time, - {msg, msg_len}, call_back); + return SdkCode::kSuccess; } -int32_t ApiImp::SendBase(const std::string inlong_group_id, - const std::string inlong_stream_id, - const std::string client_ip, int64_t report_time, - const std::string msg, UserCallBack call_back) { - int32_t check_ret = CheckData(inlong_group_id, inlong_stream_id, msg); +int32_t ApiImp::SendBase(const std::string& inlong_group_id, const std::string& stream_id, const std::string& msg, + UserCallBack call_back, int64_t report_time) { + int32_t check_ret = CheckData(inlong_group_id, stream_id, msg); if (check_ret != SdkCode::kSuccess) { return check_ret; } - ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, true); + ProxyManager::GetInstance()->CheckGroupIdConf(inlong_group_id, true); - auto recv_group = - recv_manager_->GetRecvGroup(inlong_group_id); + auto recv_group = recv_manager_->GetRecvGroup(inlong_group_id); if (recv_group == nullptr) { - LOG_ERROR("fail to get recv group, inlong_group_id:" - << inlong_group_id << " inlong_stream_id:" << inlong_stream_id); + LOG_ERROR("fail to get pack queue, group id:" << inlong_group_id << ",getStreamId:" << stream_id); return SdkCode::kFailGetRevGroup; } - return recv_group->SendData(msg, inlong_group_id, inlong_stream_id, client_ip, - report_time, call_back); + return recv_group->SendData(msg, inlong_group_id, stream_id, report_time, call_back); } int32_t ApiImp::CloseApi(int32_t max_waitms) { @@ -98,44 +108,38 @@ int32_t ApiImp::CloseApi(int32_t max_waitms) { } int32_t ApiImp::DoInit() { - LOG_INFO( - "inlong dataproxy sdk cpp start Init, version:" << constants::kVersion); + LOG_INFO("tdbus sdk cpp start Init, version:" << constants::kVersion); signal(SIGPIPE, SIG_IGN); - LOG_INFO("inlong dataproxy cpp sdk Init complete!"); + LOG_INFO("tdbus_sdk_cpp Init complete!"); ProxyManager::GetInstance()->Init(); MetricManager::GetInstance()->Init(); for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++) { - LOG_INFO("DoInit CheckConf inlong_group_id:" - << SdkConfig::getInstance()->inlong_group_ids_[i]); - ProxyManager::GetInstance()->CheckBidConf( - SdkConfig::getInstance()->inlong_group_ids_[i], false); + LOG_INFO("Do init:" << SdkConfig::getInstance()->inlong_group_ids_[i]); + ProxyManager::GetInstance()->CheckGroupIdConf(SdkConfig::getInstance()->inlong_group_ids_[i], false); } return InitManager(); } -int32_t ApiImp::CheckData(const std::string inlong_group_id, - const std::string inlong_stream_id, - const std::string msg) { +int32_t ApiImp::CheckData(const std::string& group_id, const std::string& stream_id, const std::string& msg) { if (init_succeed_ == 0 || user_exit_flag_.get() == 1) { LOG_ERROR("capi has been closed, Init first and then send"); return SdkCode::kSendAfterClose; } - if (msg.empty() || inlong_group_id.empty() || inlong_stream_id.empty()) { - LOG_ERROR("invalid input, inlong_group_id" - << inlong_group_id << " inlong_stream_id" << inlong_stream_id); + if (msg.empty() || group_id.empty() || stream_id.empty()) { + LOG_ERROR("invalid input, group id:" << group_id << " stream id:" << stream_id << "msg" << msg); return SdkCode::kInvalidInput; } if (msg.size() > SdkConfig::getInstance()->max_msg_size_) { - LOG_ERROR("msg len is too long, cur msg_len" - << msg.size() << " ext_pack_size" - << SdkConfig::getInstance()->max_msg_size_); + MetricManager::GetInstance()->AddTooLongMsgCount(group_id,stream_id,1); + LOG_ERROR("msg DataLen is too long, cur msg_len" << msg.size() << " ext_pack_size" + << SdkConfig::getInstance()->max_msg_size_); return SdkCode::kMsgTooLong; } @@ -156,15 +160,15 @@ int32_t ApiImp::InitManager() { init_succeed_ = true; return SdkCode::kSuccess; } -int32_t -ApiImp::AddInLongGroupId(const std::vector<std::string> &inlong_group_ids) { +int32_t ApiImp::AddInLongGroupId(const std::vector<std::string> &group_ids) { if (inited_ == false) { return SdkCode::kSendBeforeInit; } - for (auto inlong_group_id : inlong_group_ids) { - ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, false); + for (auto group_id : group_ids) { + ProxyManager::GetInstance()->CheckGroupIdConf(group_id, false); } + return SdkCode::kSuccess; } ApiImp::ApiImp() = default; ApiImp::~ApiImp() = default; -} // namespace inlong \ No newline at end of file +} // namespace inlong \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h index c6e6ecd0e4..32ca093416 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.h @@ -15,45 +15,43 @@ * limitations under the License. */ -#ifndef INLONG_SDK_API_IMP_H -#define INLONG_SDK_API_IMP_H +#ifndef INLONG_API_IMP_H +#define INLONG_API_IMP_H +#include <cstdint> +#include <functional> #include "../config/sdk_conf.h" #include "../manager/recv_manager.h" #include "../manager/send_manager.h" #include "../utils/atomic.h" -#include <cstdint> -#include <functional> namespace inlong { - -typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, - const int64_t, const char *); +typedef int (*UserCallBack)(const char *, const char *, const char *, int32_t, const int64_t, const char *); class ApiImp { -public: + public: ApiImp(); ~ApiImp(); int32_t InitApi(const char *config_file_path); - int32_t Send(const char *inlong_group_id, const char *inlong_stream_id, - const char *msg, int32_t msg_len, + int32_t Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len, UserCallBack call_back = nullptr); + int32_t Send(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len, + int64_t report_time, UserCallBack call_back = nullptr); int32_t CloseApi(int32_t max_waitms); - int32_t AddInLongGroupId(const std::vector<std::string> &inlong_group_ids); + int32_t AddInLongGroupId(const std::vector<std::string> &group_ids); -private: + private: int32_t DoInit(); int32_t InitManager(); - int32_t SendBase(const std::string inlong_group_id, - const std::string inlong_stream_id, - const std::string client_ip, int64_t report_time, - const std::string msg, UserCallBack call_back); + int32_t SendBase(const std::string& inlong_group_id, const std::string& stream_id, const std::string& msg, UserCallBack call_back, + int64_t report_time = 0); + + int32_t CheckData(const std::string& group_id, const std::string& stream_id, const std::string& msg); - int32_t CheckData(const std::string inlong_group_id, - const std::string inlong_stream_id, const std::string msg); + int32_t ValidateParams(const char *group_id, const char *stream_id, const char *msg, int32_t msg_len); AtomicInt user_exit_flag_{0}; volatile bool init_flag_ = false; @@ -67,5 +65,5 @@ private: std::shared_ptr<SendManager> send_manager_; }; -} // namespace inlong -#endif // INLONG_SDK_API_IMP_H +} // namespace inlong +#endif // INLONG_API_IMP_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h index 8ae749a352..d9e8366486 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/sdk_msg.h @@ -48,7 +48,16 @@ struct SdkMsg { data_pack_format_attr_(attr), inlong_group_id_(inlong_group_id), inlong_stream_id_(inlong_stream_id){} - SdkMsg() {}; + SdkMsg(){}; + void setMsg(const std::string &msg) { msg_ = msg; } + void setClientIp(const std::string &clientIp) { client_ip_ = clientIp; } + void setReportTime(uint64_t reportTime) { report_time_ = reportTime; } + void setCb(UserCallBack cb) { cb_ = cb;} + void setUserReportTime(uint64_t userReportTime) { user_report_time_ = userReportTime; } + void setUserClientIp(const std::string &userClientIp) { user_client_ip_ = userClientIp; } + void setDataPackFormatAttr(const std::string &dataPackFormatAttr) { data_pack_format_attr_ = dataPackFormatAttr; } + void setGroupId(const std::string &inlong_group_id) { inlong_group_id_ = inlong_group_id; } + void setStreamId(const std::string &inlong_stream_id) { inlong_stream_id_ = inlong_stream_id; } }; using SdkMsgPtr = std::shared_ptr<SdkMsg>; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc index b852095f68..377432a2ef 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.cc @@ -1,49 +1,56 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ #include "recv_group.h" -#include "../protocol/msg_protocol.h" -#include "../utils/capi_constant.h" -#include "../utils/utils.h" -#include "api_code.h" -#include <cstdlib> #include <functional> +#include "../core/api_code.h" +#include "../manager/buffer_manager.h" +#include "../utils/utils.h" +#include "../manager/msg_manager.h" +#include "../manager/metric_manager.h" namespace inlong { const uint32_t DEFAULT_PACK_ATTR = 400; -const uint64_t LOG_SAMPLE=100; -RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> send_manager) - : cur_len_(0), groupId_num_(0), streamId_num_(0), +const uint64_t LOG_SAMPLE = 100; +RecvGroup::RecvGroup(const std::string &group_key, std::shared_ptr<SendManager> send_manager) + : cur_len_(0), + group_key_(group_key), + groupId_num_(0), + streamId_num_(0), msg_type_(SdkConfig::getInstance()->msg_type_), - data_capacity_(SdkConfig::getInstance()->buf_size_), - send_manager_(send_manager),group_key_(group_key), - log_stat_(0){ - data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, - SdkConfig::getInstance()->pack_size_); + data_capacity_(SdkConfig::getInstance()->recv_buf_size_), + send_manager_(send_manager), + log_stat_(0), + send_group_(nullptr), + max_msg_size_(0), + uniq_id_(0) { + data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_, SdkConfig::getInstance()->pack_size_); data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR; pack_buf_ = new char[data_capacity_]; memset(pack_buf_, 0x0, data_capacity_); - data_time_ = 0; + data_time_ = Utils::getCurrentMsTime(); last_pack_time_ = Utils::getCurrentMsTime(); max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_; - - LOG_INFO("RecvGroup:"<<group_key_<<",data_capacity:"<<data_capacity_<<",max_recv_size:"<<max_recv_size_); + local_ip_ = SdkConfig::getInstance()->local_ip_; + LOG_INFO("RecvGroup:" << group_key_ << ",data_capacity:" << data_capacity_ << ",max_recv_size:" << max_recv_size_); } RecvGroup::~RecvGroup() { @@ -53,136 +60,108 @@ RecvGroup::~RecvGroup() { } } -int32_t RecvGroup::SendData(const std::string &msg, const std::string &groupId, - const std::string &streamId, - const std::string &client_ip, uint64_t report_time, - UserCallBack call_back) { +int32_t RecvGroup::SendData(const std::string &msg, const std::string &inlong_group_id_, const std::string &inlong_stream_id_, + uint64_t report_time, UserCallBack call_back) { std::lock_guard<std::mutex> lck(mutex_); - if (msg.size() + cur_len_ > max_recv_size_) { + MetricManager::GetInstance()->AddReceiveBufferFullCount(inlong_group_id_,inlong_stream_id_,1); return SdkCode::kRecvBufferFull; } - AddMsg(msg, client_ip, report_time, call_back,groupId,streamId); - - return SdkCode::kSuccess; -} + uint64_t data_time = (report_time == 0) ? data_time_ : report_time; -int32_t RecvGroup::DoDispatchMsg() { - last_pack_time_ = Utils::getCurrentMsTime(); - std::lock_guard<std::mutex> lck(mutex_); - if (group_key_.empty()) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("groupId is empty, check!!"); - log_stat_ = 0; - } - return SdkCode::kInvalidInput; - } - if (msgs_.empty()) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("no msg in msg_set, check!"); - log_stat_ = 0; - } - return SdkCode::kMsgEmpty; - } - auto send_group = send_manager_->GetSendGroup(group_key_); - if (send_group == nullptr) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("failed to get send_buf, something gets wrong, checkout!"); - log_stat_ = 0; - } - return SdkCode::kFailGetSendBuf; - } - if (!send_group->IsAvailable()) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("failed to get send group! group_key:" - << group_key_ << " send group is not available!"); - log_stat_ = 0; - } - return SdkCode::kFailGetConn; + std::string data_pack_format_attr = + "__addcol1__reptime=" + Utils::getFormatTime(data_time) + "&__addcol2__ip=" + local_ip_; + max_msg_size_ = std::max(max_msg_size_, msg.size()); + auto it = recv_queue_.find(inlong_group_id_ + inlong_stream_id_); + if (it == recv_queue_.end()) { + std::queue<SdkMsgPtr> tmp; + it = recv_queue_.insert(recv_queue_.begin(), std::make_pair(inlong_group_id_ + inlong_stream_id_, tmp)); } - if (send_group->IsFull()) { - if (log_stat_++ > LOG_SAMPLE) { - LOG_ERROR("failed to get send group! group_key:" - << group_key_ << " send group is full!"); - log_stat_ = 0; - } - return SdkCode::kSendBufferFull; + SdkMsgPtr msg_ptr = MsgManager::GetInstance()->GetMsg(); + if(nullptr == msg_ptr){ + it->second.emplace(std::make_shared<SdkMsg>(msg, local_ip_, data_time, call_back, data_pack_format_attr, local_ip_, data_time, inlong_group_id_, inlong_stream_id_)); + }else{ + msg_ptr->setMsg(msg); + msg_ptr->setClientIp(local_ip_); + msg_ptr->setReportTime(data_time); + msg_ptr->setCb(call_back); + msg_ptr->setDataPackFormatAttr(data_pack_format_attr); + msg_ptr->setUserClientIp(local_ip_); + msg_ptr->setUserReportTime(data_time); + msg_ptr->setGroupId(inlong_group_id_); + msg_ptr->setStreamId(inlong_stream_id_); + it->second.emplace(msg_ptr); } - uint32_t total_length = 0; - uint64_t max_tid_size = 0; - std::unordered_map<std::string, std::vector<SdkMsgPtr>> msgs_to_dispatch; - std::unordered_map<std::string, uint64_t> tid_stat; - while (!msgs_.empty()) { - SdkMsgPtr msg = msgs_.front(); - if (msg->msg_.size() + max_tid_size + constants::ATTR_LENGTH > SdkConfig::getInstance()->pack_size_) { - if (!msgs_to_dispatch.empty()) { - break; - } - } - std::string msg_key = msg->inlong_group_id_ + msg->inlong_stream_id_; - msgs_to_dispatch[msg_key].push_back(msg); - msgs_.pop(); - - total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH; + cur_len_ = cur_len_ + msg.size() + constants::ATTR_LENGTH; - if (tid_stat.find(msg_key) == tid_stat.end()) { - tid_stat[msg_key] = 0; - } - tid_stat[msg_key] = tid_stat[msg_key] + msg->msg_.size() + constants::ATTR_LENGTH; + return SdkCode::kSuccess; +} - max_tid_size = std::max(tid_stat[msg_key], max_tid_size); +void RecvGroup::DoDispatchMsg() { + if (!CanDispatch()) { + return; } - - cur_len_ = cur_len_ - total_length; - - for (auto it : msgs_to_dispatch) { - std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(it.second); - - ResetPackBuf(); - - if (send_buffer == nullptr) { - CallbalkToUsr(it.second); - continue; - } - - int ret = send_group->PushData(send_buffer); - if (ret != SdkCode::kSuccess) { - CallbalkToUsr(it.second); + last_pack_time_ = Utils::getCurrentMsTime(); + data_time_ = last_pack_time_; + while (!fail_queue_.empty()) { + SendBufferPtrT tmp_ptr = fail_queue_.front(); + if (SdkCode::kSuccess != send_group_->PushData(tmp_ptr)) { + return; } + fail_queue_.pop(); } - return SdkCode::kSuccess; -} - -void RecvGroup::AddMsg(const std::string &msg, std::string client_ip, - int64_t report_time, UserCallBack call_back,const std::string &groupId, - const std::string &streamId) { - if (Utils::isLegalTime(report_time)) - data_time_ = report_time; - else { - data_time_ = Utils::getCurrentMsTime(); + { + std::lock_guard<std::mutex> lck(mutex_); + recv_queue_.swap(dispatch_queue_); } - std::string user_client_ip = client_ip; - int64_t user_report_time = report_time; + for (auto &it : dispatch_queue_) { + std::vector<SdkMsgPtr> msg_vec; + uint64_t msg_size = 0; + while (!it.second.empty()) { + SdkMsgPtr msg = it.second.front(); + msg_vec.push_back(msg); + msg_size = msg_size + msg->msg_.size() + constants::ATTR_LENGTH; + it.second.pop(); + + if ((msg_size + max_msg_size_) >= SdkConfig::getInstance()->pack_size_) { + uint32_t ret = ParseMsg(msg_vec); + if (SdkCode::kBufferManagerFull == ret) { + for (const auto &it_msg : msg_vec) { + it.second.emplace(it_msg); + } + return; + } + UpdateCurrentMsgLen(msg_size); + msg_size = 0; + + if (SdkCode::kSuccess != ret) { + return; + } + std::vector<SdkMsgPtr>().swap(msg_vec); + } + } + if (!msg_vec.empty()) { + uint32_t ret = ParseMsg(msg_vec); + if (SdkCode::kBufferManagerFull == ret) { + for (const auto &it_msg : msg_vec) { + it.second.emplace(it_msg); + } + return; + } + UpdateCurrentMsgLen(msg_size); - if (client_ip.empty()) { - client_ip = "127.0.0.1"; + if (SdkCode::kSuccess != ret) { + return; + } + } } - std::string data_pack_format_attr = - "__addcol1__reptime=" + Utils::getFormatTime(data_time_) + - "&__addcol2__ip=" + client_ip; - msgs_.push(std::make_shared<SdkMsg>(msg, client_ip, data_time_, call_back, - data_pack_format_attr, user_client_ip, - user_report_time,groupId,streamId)); - - cur_len_ += msg.size() + constants::ATTR_LENGTH; } -bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, - uint32_t &out_len, uint32_t uniq_id) { +bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,uint32_t &out_len) { if (pack_data == nullptr) { LOG_ERROR("nullptr, failed to allocate memory for buf"); return false; @@ -196,7 +175,6 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, memcpy(&pack_buf_[idx], it->msg_.data(), it->msg_.size()); idx += static_cast<uint32_t>(it->msg_.size()); - // add attrlen|attr if (SdkConfig::getInstance()->isAttrDataPackFormat()) { *(uint32_t *)(&pack_buf_[idx]) = htonl(it->data_pack_format_attr_.size()); idx += sizeof(uint32_t); @@ -247,7 +225,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, groupId_num = 0; streamId_num = 0; groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ + - "&streamId=" + msgs[0]->inlong_stream_id_; + "&streamId=" + msgs[0]->inlong_stream_id_; char_groupId_flag = 0x4; } else { groupId_num = groupId_num_; @@ -261,14 +239,14 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, if (SdkConfig::getInstance()->enableTraceIP()) { if (groupId_streamId_char.empty()) attr = "node1ip=" + SdkConfig::getInstance()->local_ip_ + - "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); + "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); else attr = groupId_streamId_char + - "&node1ip=" + SdkConfig::getInstance()->local_ip_ + - "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); + "&node1ip=" + SdkConfig::getInstance()->local_ip_ + + "&rtime1=" + std::to_string(Utils::getCurrentMsTime()); } else { attr = "groupId=" + msgs[0]->inlong_group_id_ + - "&streamId=" + msgs[0]->inlong_stream_id_; + "&streamId=" + msgs[0]->inlong_stream_id_; } *(uint16_t *)bodyBegin = htons(attr.size()); bodyBegin += sizeof(uint16_t); @@ -294,7 +272,7 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, p += 4; *(uint16_t *)p = htons(cnt); p += 2; - *(uint32_t *)p = htonl(uniq_id); + *(uint32_t *)p = htonl(uniq_id_); out_len = total_len + 4; } else { @@ -319,10 +297,10 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, // attr std::string attr; attr = "groupId=" + msgs[0]->inlong_group_id_ + - "&streamId=" + msgs[0]->inlong_stream_id_; + "&streamId=" + msgs[0]->inlong_stream_id_; attr += "&dt=" + std::to_string(data_time_); - attr += "&mid=" + std::to_string(uniq_id); + attr += "&mid=" + std::to_string(uniq_id_); if (isSnappy) attr += "&cp=snappy"; attr += "&cnt=" + std::to_string(cnt); @@ -352,56 +330,97 @@ bool RecvGroup::IsZipAndOperate(std::string &res, uint32_t real_cur_len) { } void RecvGroup::DispatchMsg(bool exit) { - if (cur_len_ <= constants::ATTR_LENGTH || msgs_.empty()) - return; + if (cur_len_ <= constants::ATTR_LENGTH) return; bool len_enough = cur_len_ > SdkConfig::getInstance()->pack_size_; - bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) > - SdkConfig::getInstance()->pack_timeout_; + bool time_enough = (Utils::getCurrentMsTime() - last_pack_time_) > SdkConfig::getInstance()->pack_timeout_; if (len_enough || time_enough) { DoDispatchMsg(); } } -std::shared_ptr<SendBuffer> -RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) { +std::shared_ptr<SendBuffer> RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) { if (msgs.empty()) { - LOG_ERROR("pack msgs is empty."); return nullptr; } - std::shared_ptr<SendBuffer> send_buffer = - std::make_shared<SendBuffer>(data_capacity_); + std::shared_ptr<SendBuffer> send_buffer = BufferManager::GetInstance()->GetSendBuffer(); if (send_buffer == nullptr) { - LOG_ERROR("make send buffer failed."); return nullptr; } + uint32_t len = 0; - int32_t msg_cnt = msgs.size(); - uint32_t uniq_id = g_send_msgid.incrementAndGet(); + uint32_t msg_cnt = msgs.size(); + if (++uniq_id_ >= constants::kMaxSnowFlake) { + uniq_id_ = 0; + } - if (!PackMsg(msgs, send_buffer->content(), len, uniq_id) || len == 0) { - LOG_ERROR("failed to write data to send buf from pack queue, sendQueue " - "id:%d, buf id:%d"); + if (!PackMsg(msgs, send_buffer->content(), len) || len == 0) { + LOG_ERROR("failed to write data to send buf from pack queue, sendQueue"); return nullptr; } + send_buffer->setLen(len); send_buffer->setMsgCnt(msg_cnt); send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_); send_buffer->setStreamId(msgs[0]->inlong_stream_id_); - send_buffer->setUniqId(uniq_id); - send_buffer->setIsPacked(true); - for (auto it : msgs) { - send_buffer->addUserMsg(it); + for (const auto &it : msgs) { + if(it->cb_){ + send_buffer->addUserMsg(it); + } } return send_buffer; } -void RecvGroup::CallbalkToUsr(std::vector<SdkMsgPtr> &msgs) { - for (auto &it : msgs) { - if (it->cb_) { - it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(), - it->msg_.data(), it->msg_.size(), it->user_report_time_, - it->user_client_ip_.data()); +uint32_t RecvGroup::ParseMsg(std::vector<SdkMsgPtr> &msg_vec) { + if (msg_vec.empty()) { + return SdkCode::kSuccess; + } + + std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(msg_vec); + + if (send_buffer == nullptr) { + return SdkCode::kBufferManagerFull; + } + + uint32_t ret = send_group_->PushData(send_buffer); + if (ret != SdkCode::kSuccess) { + fail_queue_.push(send_buffer); + } + return ret; +} +bool RecvGroup::CanDispatch() { + if (group_key_.empty()) { + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("Group key is empty!"); + log_stat_ = 0; + } + return false; + } + if (nullptr == send_group_) { + send_group_ = send_manager_->GetSendGroup(group_key_); + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send group! group_key:" << group_key_); + log_stat_ = 0; + } + return false; + } + if (!send_group_->IsAvailable()) { + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send group! group_key:" << group_key_ << " is not available!"); + log_stat_ = 0; } + return false; } + if (send_group_->IsFull()) { + if (log_stat_++ > LOG_SAMPLE) { + LOG_ERROR("failed to get send group! group_key:" << group_key_ << " is full!"); + log_stat_ = 0; + } + return false; + } + return true; +} +void RecvGroup::UpdateCurrentMsgLen(uint64_t msg_size) { + std::lock_guard<std::mutex> lck(mutex_); + cur_len_ = cur_len_ - msg_size; } -} // namespace inlong \ No newline at end of file +} // namespace inlong \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h index adac9a7f10..839c49e14d 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/group/recv_group.h @@ -1,18 +1,20 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ #ifndef INLONG_SDK_RECV_GROUP_H @@ -25,17 +27,19 @@ #include <unordered_map> #include "../config/sdk_conf.h" +#include "../core/sdk_msg.h" #include "../manager/send_manager.h" #include "../utils/atomic.h" #include "../utils/noncopyable.h" namespace inlong { class RecvGroup { -private: - char *pack_buf_; + private: + char* pack_buf_; std::queue<SdkMsgPtr> msgs_; + std::queue<SdkMsgPtr> fail_msgs_; uint32_t data_capacity_; - uint32_t cur_len_; + uint64_t cur_len_; AtomicInt pack_err_; uint64_t data_time_; uint16_t groupId_num_; @@ -47,33 +51,36 @@ private: uint64_t last_pack_time_; uint64_t max_recv_size_; + std::string group_key_; uint64_t log_stat_; + SendGroupPtr send_group_; + std::string local_ip_; + uint64_t max_msg_size_; + uint64_t uniq_id_; + + std::unordered_map<std::string, std::queue<SdkMsgPtr>> recv_queue_; + std::unordered_map<std::string, std::queue<SdkMsgPtr>> dispatch_queue_; + std::queue<SendBufferPtrT> fail_queue_; - int32_t DoDispatchMsg(); - void AddMsg(const std::string &msg, std::string client_ip, - int64_t report_time, UserCallBack call_back,const std::string &groupId, - const std::string &streamId); - bool IsZipAndOperate(std::string &res, uint32_t real_cur_len); + void DoDispatchMsg(); + bool IsZipAndOperate(std::string& res, uint32_t real_cur_len); inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); } + uint32_t ParseMsg(std::vector<SdkMsgPtr>& msg_vec); -public: - RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> send_manager); + public: + RecvGroup(const std::string& group_key, std::shared_ptr<SendManager> send_manager); ~RecvGroup(); - int32_t SendData(const std::string &msg, const std::string &groupId, - const std::string &streamId, const std::string &client_ip, - uint64_t report_time, UserCallBack call_back); - bool PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data, uint32_t &out_len, - uint32_t uniq_id); + int32_t SendData(const std::string& msg, const std::string& inlong_group_id_, const std::string& inlong_stream_id_, uint64_t report_time, + UserCallBack call_back); + bool PackMsg(std::vector<SdkMsgPtr>& msgs, char* pack_data, uint32_t& out_len); void DispatchMsg(bool exit); - - char *data() const { return pack_buf_; } - - std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr> &msgs); - void CallbalkToUsr(std::vector<SdkMsgPtr> &msgs); + std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr>& msgs); + bool CanDispatch(); + void UpdateCurrentMsgLen(uint64_t msg_size); }; using RecvGroupPtr = std::shared_ptr<RecvGroup>; -} // namespace inlong +} // namespace inlong -#endif // INLONG_SDK_RECV_GROUP_H \ No newline at end of file +#endif // INLONG_SDK_RECV_GROUP_H \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index 09014d728d..1653d68de0 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -217,8 +217,8 @@ int32_t ProxyManager::GetProxy(const std::string &key, return GetProxyByClusterId(key, proxy_info_vec); } -int32_t ProxyManager::CheckBidConf(const std::string &inlong_group_id, - bool is_inited) { +int32_t ProxyManager::CheckGroupIdConf(const std::string &inlong_group_id, + bool is_inited) { { unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_); auto it = groupid_2_cluster_id_map_.find(inlong_group_id); @@ -379,7 +379,7 @@ void ProxyManager::WriteLocalCache() { } catch (...) { LOG_ERROR("WriteLocalCache error!"); } - LOG_INFO("WriteLocalCache bid number:" << groupid_count); + LOG_INFO("WriteLocalCache getGroupId number:" << groupid_count); } std::string ProxyManager::RecoverFromLocalCache(const std::string &groupid) { diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h index 3cd3e5491f..419bc60798 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -62,7 +62,7 @@ public: static ProxyManager instance; return &instance; } - int32_t CheckBidConf(const std::string &inlong_group_id, bool is_inited); + int32_t CheckGroupIdConf(const std::string &inlong_group_id, bool is_inited); void Update(); void DoUpdate(); void Init(); diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index 616f018b90..47282e0e32 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -22,6 +22,7 @@ #include "string.h" #include <stdint.h> +#include <limits> namespace inlong { namespace constants { @@ -79,6 +80,9 @@ static const uint32_t kReserveProxyNum = 2; static const bool kEnableTCPNagle = true; static const uint32_t kTcpIdleTime = 600000; static const uint32_t kTcpDetectionInterval = 60000; +static const uint32_t kMaxRetryIntervalMs= 3000; +static const uint32_t kRetryIntervalMs= 200; +static const int32_t kRetryTimes = 1; static const char kSerIP[] = "127.0.0.1"; static const uint32_t kSerPort = 46801; @@ -87,6 +91,7 @@ static const uint32_t kMsgType = 7; static const bool kEnableSetAffinity = false; static const uint32_t kMaskCPUAffinity = 0xff; static const uint16_t kExtendField = 0; +static const uint64_t kMaxSnowFlake = std::numeric_limits<uint64_t>::max(); // http basic auth static const char kBasicAuthHeader[] = "Authorization:"; diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h index 1f2970b17d..5b7e5a596a 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/send_buffer.h @@ -72,8 +72,8 @@ public: void setMsgCnt(const int32_t &msg_cnt) { msg_cnt_ = msg_cnt; } uint32_t len() { return len_; } void setLen(const uint32_t len) { len_ = len; } - std::string bid() { return inlong_group_id_; } - std::string tid() { return inlong_stream_id_; } + std::string getGroupId() { return inlong_group_id_; } + std::string getStreamId() { return inlong_stream_id_; } void setInlongGroupId(const std::string &inlong_group_id) { inlong_group_id_ = inlong_group_id; }