Hello, I've been experiencing a constant and continuous memory leak in my grpc async-server implementation. I've moved my code to allocate all of my messages using an Arena that is using a static buffer for allocation, and yet I see the leak, so it seems to me that the source of the leak is the internal implementation of the grpc server classes/messages.
I assume that I'm missing something simple (do I need to call to some cleanup callback or something of that sort) - but I couldn't figure out what I'm missing so far (looking in documentation, grpc code, and valgrind output...). I'm attaching a simplified version of my code. I'd appreciate if anyone can have a look and maybe spot my mistake. I'm basically running the server, and then running a loop which calls the client every x seconds and I can see that the server's RAM usage is growing up constantly. The full simplified example can be seen in https://github.com/cohdan/grpc_leaky_example, some logs (including valgrind output) can be found there as well. Thanks. #include <iostream> #include <thread> #include <csignal> syntax = "proto3"; package example.v1; message ServerPing { uint64 ping_generation = 1; } message ServerPong { ServerPing ping = 1; uint64 pings_so_far = 2; } message ServerPingRequest { ServerPing ping = 1; } message ServerPingResponse { ServerPong pong = 2; } message VersionGetRequest { } message VersionGetResponse { string version = 1; string commit_hash = 2; } service ExampleService { rpc VersionGet (VersionGetRequest) returns (VersionGetResponse) {} rpc ServerPing (ServerPingRequest) returns (ServerPingResponse) {} } #include "server.h" using std::cout; using std::endl; void term_signal_handler(int signum) { cout << "Starting sigterm handler" << endl; server::get_server().close_server(); } void signal_handler(int signum, siginfo_t *siginfo, void *context) { cout << "Interrupt signal " << signum << " received" << endl; if (signum == SIGTERM) { term_signal_handler(signum); } // Reset to default behavior and re-raise signal struct sigaction sa; sa.sa_handler = SIG_DFL; sigemptyset(&sa.sa_mask); sa.sa_flags = 0; sigaction(signum, &sa, nullptr); raise(signum); } void register_signals() { struct sigaction sa; sa.sa_sigaction = signal_handler; sigemptyset(&sa.sa_mask); sa.sa_flags = SA_SIGINFO; // Ignore SIGPIPE struct sigaction ignore_sa; ignore_sa.sa_handler = SIG_IGN; sigemptyset(&ignore_sa.sa_mask); ignore_sa.sa_flags = 0; sigaction(SIGPIPE, &ignore_sa, nullptr); std::vector<int> signals_to_handle = { SIGINT, SIGTERM, SIGQUIT, SIGABRT, SIGSEGV, SIGFPE, SIGILL, SIGBUS, SIGSYS, SIGSTOP, SIGUSR1 }; for (int signum : signals_to_handle) { sigaction(signum, &sa, nullptr); } } int main() { register_signals(); server::get_server().init_server(); while (true) { std::this_thread::yield(); } server::get_server().close_server(); return 0; } // // Created by Dan Cohen on 11/11/2024. // #include "server.h" #include "handlers.h" #include <chrono> #include <sstream> #include <memory> #include <cstdlib> #include <iostream> constexpr int MAX_RETRY_COUNT = 10; using namespace grpc; using namespace std::chrono_literals; using namespace google::protobuf; using std::endl; using std::cout; void server::init_rpc_handlers() { // Create the command handlers auto _cmd_ping = (handler_base *)handlers_pool ::get_pool().allocator().allocate_node(); _cmd_ping = new (_cmd_ping) handler_server_ping(_completion_queue.get(), _service); auto _cmd_get_version = (handler_base *)handlers_pool ::get_pool().allocator().allocate_node(); _cmd_get_version = new (_cmd_get_version) handler_version_get(_completion_queue.get(), _service); _cmd_ping->init_rpc_handler(); _cmd_get_version->init_rpc_handler(); } void server::handle_requests_queue() { void *rpc_tag = nullptr; // uniquely identifies a request. auto rpc_status = false; uint16_t retry_count = 0; while (true) { auto ret = _completion_queue->Next(&rpc_tag, &rpc_status); if (!ret) { cout << "completion_queue next method indicates that the gRPC server is shutting down, ret=" << ret << ", rpc_status=" << rpc_status << ", did_we_initiate=" << is_server_shutting_down() << endl; break; } if (!rpc_status) { cout << "completion_queue next method indicates that an RPC request failed, moving to next request, retry_count=" << retry_count << endl; if (rpc_tag) { auto rpc_handler = static_cast<handler_base *>(rpc_tag); cout << "Failed rpc request details-> " << rpc_handler->get_request_debug_message() << endl; rpc_handler->complete_request(); } if (is_server_shutting_down()) { cout << "completion_queue next method indicates that the gRPC server is shutting down, ret=" << ret << ", rpc_status=" << rpc_status << endl; break; } if (retry_count < MAX_RETRY_COUNT) { ++retry_count; std::this_thread::sleep_for(5ms); } else { cout << "Retry count exceeded the configured max, can't recover - killing the agent" << endl; ::abort(); } continue; } retry_count = 0; if (!rpc_tag) { cout << "invalid RPC request moving to next request" << endl; continue; } static_cast<handler_base *>(rpc_tag)->handle_rpc_request(); } } bool server::init_server() { if (_did_init) { return true; } _shutting_down.store(false); _service = std::make_shared<ExampleService::AsyncService>(); auto server_address = "0.0.0.0"; auto server_port = 6212; std::stringstream stream; stream << server_address << ":" << server_port; std::string server_address_str(stream.str()); auto max_message_size = 10 * 1024 * 1024; ServerBuilder builder; // Set max message size. builder.SetMaxMessageSize(max_message_size); builder.SetMaxSendMessageSize(max_message_size); builder.SetMaxReceiveMessageSize(max_message_size); // Listen on the given address without any authentication mechanism. builder.AddListeningPort(server_address_str, grpc::InsecureServerCredentials()); builder.RegisterService(_service.get()); // Get hold of the completion queue used for the asynchronous communication // with the gRPC runtime. _completion_queue = builder.AddCompletionQueue(); // Finally assemble the server. _server = builder.BuildAndStart(); _server_thread = std::make_unique<thread>(std::function<void(server *)>(& server::handle_requests_queue), this); init_rpc_handlers(); _did_init = true; return true; } void server::close_server() { if (!_did_init) { return; } cout << "Closing the grpc server..." << endl; _shutting_down.store(true); _server->Shutdown(); _completion_queue->Shutdown(); if (_server_thread && _server_thread->joinable()) { _server_thread->join(); } // Make sure the queue is empty before closing it. void* ignored_tag; bool ignored_ok; while (_completion_queue->Next(&ignored_tag, &ignored_ok)) { } _server_thread.reset(); _server_thread = nullptr; _completion_queue.reset(); _completion_queue = nullptr; _service.reset(); _service = nullptr; _did_init = false; } server::~server() { close_server(); } // // Created by Dan Cohen on 11/11/2024. // #ifndef GRPC_EXAMPLE_HANDLERS_H #define GRPC_EXAMPLE_HANDLERS_H #include <iostream> #include <grpcpp/grpcpp.h> #include <google/protobuf/arena.h> #include "memory_pool.h" using namespace google::protobuf; using namespace example::v1; using completion_queue_ptr = grpc::ServerCompletionQueue*; using completion_queue_sptr = std::unique_ptr<grpc::ServerCompletionQueue>; using service_ptr = std::shared_ptr<ExampleService::AsyncService>; using std::endl; using std::cout; enum request_state_e { REQUEST_STATE_INVALID = -1, REQUEST_STATE_CREATE = 0, REQUEST_STATE_PROCESS = 1, REQUEST_STATE_COMPLETE = 2, REQUEST_STATE_LAST = 3 }; class handler_base { public: handler_base(completion_queue_ptr completion_queue, service_ptr service) : _completion_queue(completion_queue), _service(service), _arena(nullptr) { _state = REQUEST_STATE_CREATE; } virtual void init_rpc_handler() = 0; virtual const std::string get_request_debug_message() = 0; virtual void handle_rpc_request() { if (_state == REQUEST_STATE_PROCESS) { cout << "Recieved rpc: "<< get_request_debug_message() << std::endl; if (process_request()) { reset_and_prepare_handler_for_next_request(); _state = REQUEST_STATE_COMPLETE; } } else if (_state == REQUEST_STATE_COMPLETE) { complete_request(); } } virtual ~handler_base() {} virtual void complete_request() { handlers_pool::get_pool().allocator().deallocate_node(reinterpret_cast<char *>(this)); } protected: virtual void reset_and_prepare_handler_for_next_request() { handler_base *next_handler = new_rpc_handler(); next_handler->init_rpc_handler(); } virtual bool process_request() = 0; virtual handler_base *new_rpc_handler() = 0; void setup_arena_options(google::protobuf::ArenaOptions &arena_options, void *(*alloc_func)(size_t), void (*dealloc_func)(void *,size_t), size_t size) { arena_options.block_alloc = alloc_func; arena_options.block_dealloc = dealloc_func; arena_options.start_block_size = 0; arena_options.max_block_size = size; arena_options.initial_block = nullptr; arena_options.initial_block_size = 0; } completion_queue_ptr _completion_queue; service_ptr _service; grpc::ServerContext _ctx; request_state_e _state; std::unique_ptr<Arena> _arena; private: handler_base(const handler_base &other) = delete; }; class handler_server_ping : public handler_base { public: handler_server_ping(completion_queue_ptr completion_queue, service_ptr service) : handler_base(completion_queue, service), _responder(&_ctx) { memory_allocator::reset(_offset, _buffer, 4096); auto alloc = [](size_t size){ return memory_allocator::allocate(_buffer, _offset, size, 4096);}; auto dealloc = [](void *ptr, size_t size){ cout << "Freeing " << size << " bytes" << endl; memory_allocator::dealloc(_buffer, _offset, size);}; ArenaOptions options; setup_arena_options(options, alloc, dealloc, 4096); _arena = std::make_unique<Arena>(options); } void init_rpc_handler() override { _server_ping_request = Arena::Create<ServerPingRequest>(_arena.get()); _service->RequestServerPing(&_ctx, _server_ping_request, &_responder, _completion_queue, _completion_queue, this); _state = REQUEST_STATE_PROCESS; } const std::string get_request_debug_message() override { return "[" + ServerPingRequest::descriptor()->name() + "] " + _server_ping_request->ShortDebugString(); } protected: bool process_request() override { if (_state == REQUEST_STATE_PROCESS) { auto *response = Arena::Create<ServerPingResponse>(_arena.get()); *response->mutable_pong()->mutable_ping() = _server_ping_request->ping(); _ping_counter++; response->mutable_pong()->set_pings_so_far(_ping_counter); _responder.Finish(*response, grpc::Status::OK, this); } return true; } handler_base *new_rpc_handler() override { auto new_cmd = (handler_base *)handlers_pool ::get_pool().allocator().allocate_node(); return new (new_cmd) handler_server_ping(_completion_queue, _service); } private: static inline char _buffer[4096] = {}; static inline size_t _offset = 0; ServerPingRequest *_server_ping_request; grpc::ServerAsyncResponseWriter<ServerPingResponse> _responder; static inline uint64_t _ping_counter{0}; }; class handler_version_get : public handler_base { public: handler_version_get(completion_queue_ptr completion_queue, service_ptr service) : handler_base(completion_queue, service), _responder(&_ctx) { memory_allocator::reset(_offset, _buffer, 4096); auto alloc = [](size_t size){ return memory_allocator::allocate(_buffer, _offset, size, 4096);}; auto dealloc = [](void *ptr, size_t size){ memory_allocator::dealloc(_buffer, _offset, size);}; ArenaOptions options; setup_arena_options(options, alloc, dealloc, 4096); _arena = std::make_unique<Arena>(options); } void init_rpc_handler() override { _version_get_request = Arena::Create<VersionGetRequest>(_arena.get()); _service->RequestVersionGet(&_ctx, _version_get_request, &_responder, _completion_queue, _completion_queue, this); _state = REQUEST_STATE_PROCESS; } const std::string get_request_debug_message() override { return "[" + VersionGetRequest::descriptor()->name() + "] " + _version_get_request->ShortDebugString(); } protected: bool process_request() override { if (_state == REQUEST_STATE_PROCESS) { auto *response = Arena::Create<VersionGetResponse>(_arena.get()); response->set_version("v1.1"); response->set_commit_hash("abc123"); _responder.Finish(*response, grpc::Status::OK, this); } return true; } handler_base *new_rpc_handler() override { auto new_cmd = (handler_base *)handlers_pool ::get_pool().allocator().allocate_node();; return new (new_cmd) handler_version_get(_completion_queue, _service); } private: static inline char _buffer[4096] = {}; static inline size_t _offset = 0; VersionGetRequest *_version_get_request; grpc::ServerAsyncResponseWriter<VersionGetResponse> _responder; }; #endif //GRPC_EXAMPLE_HANDLERS_H #ifndef GRPC_EXAMPLE_MEMORY_POOL_H #define GRPC_EXAMPLE_MEMORY_POOL_H #include <cstdlib> #include <cstdio> #include <string> #include <mutex> #include <iostream> using std::cout; using std::endl; class memory_allocator { public: static void* allocate(char *buffer, size_t &offset, size_t alloc_size, size_t max_size) { if (offset + alloc_size > max_size) { return nullptr; // Not enough space } void *ptr = buffer + offset; offset += alloc_size; return ptr; } static void dealloc(void *ptr, size_t &offset, size_t dealloc_size) { // Do nothing } static void reset(size_t &offset, char buffer[], size_t size) { offset = 0; ::memset(buffer, 0x20, size); } }; class mem_pool{ public: template <class N> struct basic_list{ basic_list() : head(nullptr), tail(nullptr), size(0), _m() {} N* head; N* tail; uint32_t size; std::mutex _m; basic_list(basic_list&& o) : _m(){ o.head = head; o.tail = tail; o.size = size; } struct itererator{ N* head; N* tail; basic_list* lst; }; N* erase(N* node){ std::lock_guard lk(_m); if(node->next){ node->next->prev = node->prev; } if(node->prev){ node->prev->next = node->next; } if (head == node){ head = node->next; } if (tail == node){ tail = node->prev; } size--; node->prev = node->next = nullptr; node->lst = nullptr; return node; } void push_back(N* node){ std::lock_guard lk(_m); node->prev = node->next = nullptr; size++; if (tail == nullptr){ tail = node; head = node; node->lst = this; return; } tail->next=node; node->prev = tail; tail = node; node->lst = this; } void clear(){ while(head){ erase(head); } } }; struct mem_node{ typedef basic_list<mem_node> list; mem_node *next, *prev; list* lst; uint64_t index; }; mem_pool() : _mutex(), _count(0), _size(0), _allocated(nullptr), _free_list(), _allocated_list(), _did_init(false), _name("unnamed_pool") {} void init(uint64_t size, uint64_t count, const std::string& name){ std::unique_lock<std::mutex> g(_mutex); if(_did_init) { return; } _size = size; _count = count; _allocated = (char*)malloc(count * size); _nodes = new mem_node[count]; for (int i = 0 ; i < count ; i ++){ _nodes[i].index = i; _nodes[i].next = nullptr; _nodes[i].prev = nullptr; _free_list.push_back(_nodes+i); } if (!name.empty()) { _name = name; } _did_init = true; } char* allocate_node(){ std::unique_lock<std::mutex> g(_mutex); if (_free_list.head == nullptr){ return nullptr; } mem_node* n = _free_list.erase(_free_list.head); _allocated_list.push_back(n); return _allocated + (_size * n->index); } bool deallocate_node(char* ptr){ std::unique_lock<std::mutex> g(_mutex); uint64_t index = (ptr - _allocated) / _size; if (_nodes[index].lst == &_allocated_list){ _allocated_list.erase(_nodes + index); _free_list.push_back(_nodes + index); return true; } else { // not allocated return false; } } void close(){ std::unique_lock<std::mutex> g(_mutex); if (!_did_init){ return; } _free_list.clear(); _allocated_list.clear(); free(_allocated); delete[] _nodes; _allocated = nullptr; _nodes = nullptr; _did_init = false; } protected: bool _did_init; std::string _name; char* _allocated; mem_node* _nodes; mem_node::list _free_list; mem_node::list _allocated_list; uint64_t _size; uint64_t _count; std::mutex _mutex; }; class handlers_pool { public: static handlers_pool &get_pool() { static handlers_pool the_pool; return the_pool; } mem_pool &allocator() { return *_pool; } private: handlers_pool() { _pool = new mem_pool(); _pool->init(2048, 100, "handlers"); } ~handlers_pool() { _pool->close(); delete _pool; _pool = nullptr; } mem_pool *_pool; }; #endif // GRPC_EXAMPLE_MEMORY_POOL_H // // Created by Dan Cohen on 11/11/2024. // #include <string> #include <iostream> #include <grpcpp/grpcpp.h> #include "protos/example/v1/example.grpc.pb.h" using std::cout; using std::endl; using namespace example::v1; void usage() { cout << "Usage: grpc_client [p|v]" << endl; cout << "p - send a ping request to the server" << endl; cout << "v - ask the server version" << endl; } void version() { cout << "Asking for version..." << endl; auto address = std::string("127.0.0.1"); auto port = std::string("6212"); // Starting a grpc client to run the version command. std::unique_ptr<ExampleService::Stub> stub; std::shared_ptr<grpc::Channel> channel; std::string server_address = address + ":" + port; channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()); stub = ExampleService::NewStub(channel); // Sending the ping request. VersionGetRequest request; VersionGetResponse response; grpc::ClientContext context; auto ret = stub->VersionGet(&context, request, &response); if (ret.ok()) { cout << response.DebugString() << endl; } else { std::cout << "VersionGet failed with the following error: error_code=" << ret.error_code() << std::endl; std::cout << "Error message: '" << ret.error_message() << "'" << std::endl; std::cout << "Error details: '" << ret.error_details() << "'" << std::endl; } } void ping() { cout << "Sending ping..." << endl; auto address = std::string("127.0.0.1"); auto port = std::string("6212"); // Starting a grpc client to run the ping command. std::unique_ptr<ExampleService::Stub> stub; std::shared_ptr<grpc::Channel> channel; std::string server_address = address + ":" + port; channel = grpc::CreateChannel(server_address, grpc::InsecureChannelCredentials()); stub = ExampleService::NewStub(channel); // Sending the ping request. ServerPingRequest request; request.mutable_ping()->set_ping_generation(time(nullptr)); cout << "Sending ping request with generation=" << request.ping().ping_generation() << endl; ServerPingResponse response; grpc::ClientContext context; auto ret = stub->ServerPing(&context, request, &response); if (ret.ok()) { cout << "Received ping response with generation=" << response.pong().ping().ping_generation() << endl; cout << "Server pong:" << response.DebugString() << endl; } else { cout << "Ping failed with the following error: error_code=" << ret.error_code() << endl; cout << "Error message: '" << ret.error_message() << "'" << endl; cout << "Error details: '" << ret.error_details() << "'" << endl; } } int main(int argc, char **argv) { if (argc != 2) { usage(); return 1; } std::string command(argv[1]); if (command == "p") { ping(); } else if (command == "v") { version(); } else { usage(); return 1; } return 0; } -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion visit https://groups.google.com/d/msgid/grpc-io/5652a7ff-dac1-4210-9291-24776a6868b1n%40googlegroups.com.