Hello,

I've been experiencing a constant and continuous memory leak in my grpc 
async-server implementation. I've moved my code to allocate all of my 
messages using an Arena that is using a static buffer for allocation, and 
yet I see the leak, so it seems to me that the source of the leak is the 
internal implementation of the grpc server classes/messages.

I assume that I'm missing something simple (do I need to call to some 
cleanup callback or something of that sort) - but I couldn't figure out 
what I'm missing so far (looking in documentation, grpc code, and valgrind 
output...).

I'm attaching a simplified version of my code. I'd appreciate if anyone can 
have a look and maybe spot my mistake.

I'm basically running the server, and then running a loop which calls the 
client every x seconds and I can see that the server's RAM usage is growing 
up constantly.

The full simplified example can be seen 
in https://github.com/cohdan/grpc_leaky_example, some logs (including 
valgrind output) can be found there as well.

Thanks.

#include <iostream>
#include <thread>
#include <csignal>


syntax = "proto3";

package example.v1;

message ServerPing {
uint64 ping_generation = 1;
}

message ServerPong {
ServerPing ping = 1;
uint64 pings_so_far = 2;
}

message ServerPingRequest {
ServerPing ping = 1;
}

message ServerPingResponse {
ServerPong pong = 2;
}

message VersionGetRequest {

}

message VersionGetResponse {
string version = 1;
string commit_hash = 2;
}


service ExampleService {
rpc VersionGet (VersionGetRequest) returns (VersionGetResponse) {}
rpc ServerPing (ServerPingRequest) returns (ServerPingResponse) {}
}
#include "server.h"

using std::cout;
using std::endl;

void term_signal_handler(int signum) {
cout << "Starting sigterm handler" << endl;
server::get_server().close_server();
}

void signal_handler(int signum, siginfo_t *siginfo, void *context) {
cout << "Interrupt signal " << signum << " received" << endl; 
if (signum == SIGTERM) {
term_signal_handler(signum);
}

// Reset to default behavior and re-raise signal
struct sigaction sa;
sa.sa_handler = SIG_DFL;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(signum, &sa, nullptr);
raise(signum);
}

void register_signals() {
struct sigaction sa;
sa.sa_sigaction = signal_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_SIGINFO;

// Ignore SIGPIPE
struct sigaction ignore_sa;
ignore_sa.sa_handler = SIG_IGN;
sigemptyset(&ignore_sa.sa_mask);
ignore_sa.sa_flags = 0;
sigaction(SIGPIPE, &ignore_sa, nullptr);

std::vector<int> signals_to_handle = { SIGINT, SIGTERM, SIGQUIT, SIGABRT, 
SIGSEGV, SIGFPE, SIGILL, SIGBUS, SIGSYS, SIGSTOP, SIGUSR1 };
for (int signum : signals_to_handle) {
sigaction(signum, &sa, nullptr);
}
}

int main() {
register_signals();
server::get_server().init_server();
while (true) {
std::this_thread::yield();
}
server::get_server().close_server();
return 0;
}

//
// Created by Dan Cohen on 11/11/2024.
//

#include "server.h"
#include "handlers.h"

#include <chrono>
#include <sstream>
#include <memory>
#include <cstdlib>
#include <iostream>

constexpr int MAX_RETRY_COUNT = 10;

using namespace grpc;
using namespace std::chrono_literals;
using namespace google::protobuf;

using std::endl;
using std::cout;

void server::init_rpc_handlers() {
// Create the command handlers
auto _cmd_ping = (handler_base *)handlers_pool
::get_pool().allocator().allocate_node();
_cmd_ping = new (_cmd_ping) handler_server_ping(_completion_queue.get(), 
_service);

auto _cmd_get_version = (handler_base *)handlers_pool
::get_pool().allocator().allocate_node();
_cmd_get_version = new (_cmd_get_version) 
handler_version_get(_completion_queue.get(), _service);

_cmd_ping->init_rpc_handler();
_cmd_get_version->init_rpc_handler();
}

void server::handle_requests_queue() {
void *rpc_tag = nullptr; // uniquely identifies a request.
auto rpc_status = false;
uint16_t retry_count = 0;

while (true) {
auto ret = _completion_queue->Next(&rpc_tag, &rpc_status);
if (!ret) {
cout << "completion_queue next method indicates that the gRPC server is 
shutting down, ret=" << ret << ", rpc_status=" << rpc_status << ", 
did_we_initiate=" << is_server_shutting_down() << endl;
break;
}
if (!rpc_status) {
cout << "completion_queue next method indicates that an RPC request failed, 
moving to next request, retry_count=" << retry_count << endl;

if (rpc_tag) {
auto rpc_handler = static_cast<handler_base *>(rpc_tag);
cout << "Failed rpc request details-> " << 
rpc_handler->get_request_debug_message() 
<< endl;
rpc_handler->complete_request();
}

if (is_server_shutting_down()) {
cout << "completion_queue next method indicates that the gRPC server is 
shutting down, ret=" << ret << ", rpc_status=" << rpc_status << endl;
break;
}

if (retry_count < MAX_RETRY_COUNT) {
++retry_count;
std::this_thread::sleep_for(5ms);
}
else {
cout << "Retry count exceeded the configured max, can't recover - killing 
the agent" << endl;
::abort();
}
continue;
}
retry_count = 0;
if (!rpc_tag) {
cout << "invalid RPC request moving to next request" << endl;
continue;
}

static_cast<handler_base *>(rpc_tag)->handle_rpc_request();
}
}

bool server::init_server() {
if (_did_init) {
return true;
}

_shutting_down.store(false);
_service = std::make_shared<ExampleService::AsyncService>();

auto server_address = "0.0.0.0";
auto server_port = 6212;
std::stringstream stream;
stream << server_address << ":" << server_port;
std::string server_address_str(stream.str());

auto max_message_size = 10 * 1024 * 1024;
ServerBuilder builder;

// Set max message size.
builder.SetMaxMessageSize(max_message_size);
builder.SetMaxSendMessageSize(max_message_size);
builder.SetMaxReceiveMessageSize(max_message_size);

// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address_str, 
grpc::InsecureServerCredentials());
builder.RegisterService(_service.get());

// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.

_completion_queue = builder.AddCompletionQueue();

// Finally assemble the server.
_server = builder.BuildAndStart();

_server_thread = std::make_unique<thread>(std::function<void(server *)>(&
server::handle_requests_queue), this);

init_rpc_handlers();

_did_init = true;
return true;
}

void server::close_server() {
if (!_did_init) {
return;
}

cout << "Closing the grpc server..." << endl;

_shutting_down.store(true);
_server->Shutdown();
_completion_queue->Shutdown();

if (_server_thread && _server_thread->joinable()) {
_server_thread->join();
}

// Make sure the queue is empty before closing it.
void* ignored_tag;
bool ignored_ok;
while (_completion_queue->Next(&ignored_tag, &ignored_ok)) { }

_server_thread.reset();
_server_thread = nullptr;

_completion_queue.reset();
_completion_queue = nullptr;

_service.reset();
_service = nullptr;

_did_init = false;
}

server::~server() {
close_server();
}


//
// Created by Dan Cohen on 11/11/2024.
//

#ifndef GRPC_EXAMPLE_HANDLERS_H
#define GRPC_EXAMPLE_HANDLERS_H

#include <iostream>
#include <grpcpp/grpcpp.h>
#include <google/protobuf/arena.h>

#include "memory_pool.h"

using namespace google::protobuf;
using namespace example::v1;

using completion_queue_ptr = grpc::ServerCompletionQueue*;
using completion_queue_sptr = std::unique_ptr<grpc::ServerCompletionQueue>;
using service_ptr = std::shared_ptr<ExampleService::AsyncService>;

using std::endl;
using std::cout;

enum request_state_e {
REQUEST_STATE_INVALID = -1,
REQUEST_STATE_CREATE = 0,
REQUEST_STATE_PROCESS = 1,
REQUEST_STATE_COMPLETE = 2,
REQUEST_STATE_LAST = 3
};

class handler_base {
public:
handler_base(completion_queue_ptr completion_queue, service_ptr service) : 
_completion_queue(completion_queue), _service(service), _arena(nullptr) {
_state = REQUEST_STATE_CREATE;
}
virtual void init_rpc_handler() = 0;
virtual const std::string get_request_debug_message() = 0;

virtual void handle_rpc_request() {
if (_state == REQUEST_STATE_PROCESS) {
cout << "Recieved rpc: "<< get_request_debug_message() << std::endl;

if (process_request()) {
reset_and_prepare_handler_for_next_request();
_state = REQUEST_STATE_COMPLETE;
}
}
else if (_state == REQUEST_STATE_COMPLETE) {
complete_request();
}
}

virtual ~handler_base() {}
virtual void complete_request() {
handlers_pool::get_pool().allocator().deallocate_node(reinterpret_cast<char
*>(this));
}
protected:
virtual void reset_and_prepare_handler_for_next_request() {
handler_base *next_handler = new_rpc_handler();
next_handler->init_rpc_handler();
}
virtual bool process_request() = 0;
virtual handler_base *new_rpc_handler() = 0;

void setup_arena_options(google::protobuf::ArenaOptions &arena_options, void 
*(*alloc_func)(size_t), void (*dealloc_func)(void *,size_t), size_t size) {
arena_options.block_alloc = alloc_func;
arena_options.block_dealloc = dealloc_func;
arena_options.start_block_size = 0;
arena_options.max_block_size = size;
arena_options.initial_block = nullptr;
arena_options.initial_block_size = 0;
}

completion_queue_ptr _completion_queue;
service_ptr _service;
grpc::ServerContext _ctx;
request_state_e _state;
std::unique_ptr<Arena> _arena;

private:
handler_base(const handler_base &other) = delete;
};

class handler_server_ping : public handler_base {
public:
handler_server_ping(completion_queue_ptr completion_queue, service_ptr 
service) : handler_base(completion_queue, service), _responder(&_ctx) {
memory_allocator::reset(_offset, _buffer, 4096);
auto alloc = [](size_t size){ return memory_allocator::allocate(_buffer, 
_offset, size, 4096);};
auto dealloc = [](void *ptr, size_t size){ cout << "Freeing " << size << " 
bytes" << endl; memory_allocator::dealloc(_buffer, _offset, size);};
ArenaOptions options;
setup_arena_options(options, alloc, dealloc, 4096);
_arena = std::make_unique<Arena>(options);
}

void init_rpc_handler() override {
_server_ping_request = Arena::Create<ServerPingRequest>(_arena.get());
_service->RequestServerPing(&_ctx, _server_ping_request, &_responder, 
_completion_queue, _completion_queue, this);
_state = REQUEST_STATE_PROCESS;
}

const std::string get_request_debug_message() override {
return "[" + ServerPingRequest::descriptor()->name() + "] " + 
_server_ping_request->ShortDebugString();
}
protected:
bool process_request() override {
if (_state == REQUEST_STATE_PROCESS) {
auto *response = Arena::Create<ServerPingResponse>(_arena.get());
*response->mutable_pong()->mutable_ping() = _server_ping_request->ping();
_ping_counter++;
response->mutable_pong()->set_pings_so_far(_ping_counter);
_responder.Finish(*response, grpc::Status::OK, this);
}
return true;
}

handler_base *new_rpc_handler() override {
auto new_cmd = (handler_base *)handlers_pool
::get_pool().allocator().allocate_node();
return new (new_cmd) handler_server_ping(_completion_queue, _service);
}

private:
static inline char _buffer[4096] = {};
static inline size_t _offset = 0;

ServerPingRequest *_server_ping_request;
grpc::ServerAsyncResponseWriter<ServerPingResponse> _responder;
static inline uint64_t _ping_counter{0};
};

class handler_version_get : public handler_base {
public:
handler_version_get(completion_queue_ptr completion_queue, service_ptr 
service) : handler_base(completion_queue, service),
_responder(&_ctx) {
memory_allocator::reset(_offset, _buffer, 4096);
auto alloc = [](size_t size){ return memory_allocator::allocate(_buffer, 
_offset, size, 4096);};
auto dealloc = [](void *ptr, size_t size){ memory_allocator::dealloc(_buffer, 
_offset, size);};
ArenaOptions options;
setup_arena_options(options, alloc, dealloc, 4096);
_arena = std::make_unique<Arena>(options);
}

void init_rpc_handler() override {
_version_get_request = Arena::Create<VersionGetRequest>(_arena.get());
_service->RequestVersionGet(&_ctx, _version_get_request, &_responder, 
_completion_queue, _completion_queue, this);
_state = REQUEST_STATE_PROCESS;
}

const std::string get_request_debug_message() override {
return "[" + VersionGetRequest::descriptor()->name() + "] " + 
_version_get_request->ShortDebugString();
}
protected:
bool process_request() override {
if (_state == REQUEST_STATE_PROCESS) {
auto *response = Arena::Create<VersionGetResponse>(_arena.get());
response->set_version("v1.1");
response->set_commit_hash("abc123");
_responder.Finish(*response, grpc::Status::OK, this);
}
return true;
}

handler_base *new_rpc_handler() override {
auto new_cmd = (handler_base *)handlers_pool
::get_pool().allocator().allocate_node();;
return new (new_cmd) handler_version_get(_completion_queue, _service);
}
private:
static inline char _buffer[4096] = {};
static inline size_t _offset = 0;

VersionGetRequest *_version_get_request;
grpc::ServerAsyncResponseWriter<VersionGetResponse> _responder;
};

#endif //GRPC_EXAMPLE_HANDLERS_H


#ifndef GRPC_EXAMPLE_MEMORY_POOL_H
#define GRPC_EXAMPLE_MEMORY_POOL_H

#include <cstdlib>
#include <cstdio>
#include <string>
#include <mutex>
#include <iostream>

using std::cout;
using std::endl;

class memory_allocator {
public:
static void* allocate(char *buffer, size_t &offset, size_t alloc_size, size_t 
max_size) {
if (offset + alloc_size > max_size) {
return nullptr; // Not enough space
}
void *ptr = buffer + offset;
offset += alloc_size;
return ptr;
}

static void dealloc(void *ptr, size_t &offset, size_t dealloc_size) {
// Do nothing
}

static void reset(size_t &offset, char buffer[], size_t size) {
offset = 0;
::memset(buffer, 0x20, size);
}
};

class mem_pool{
public:
template <class N>
struct basic_list{
basic_list() : head(nullptr), tail(nullptr), size(0), _m() {}
N* head;
N* tail;
uint32_t size;
std::mutex _m;

basic_list(basic_list&& o) : _m(){
o.head = head;
o.tail = tail;
o.size = size;
}

struct itererator{
N* head;
N* tail;
basic_list* lst;
};

N* erase(N* node){
std::lock_guard lk(_m);
if(node->next){
node->next->prev = node->prev;
}
if(node->prev){
node->prev->next = node->next;
}
if (head == node){
head = node->next;
}
if (tail == node){
tail = node->prev;
}
size--;
node->prev = node->next = nullptr;
node->lst = nullptr;
return node;
}

void push_back(N* node){
std::lock_guard lk(_m);
node->prev = node->next = nullptr;
size++;
if (tail == nullptr){
tail = node;
head = node;
node->lst = this;
return;
}
tail->next=node;
node->prev = tail;
tail = node;
node->lst = this;
}

void clear(){
while(head){
erase(head);
}
}
};


struct mem_node{
typedef basic_list<mem_node> list;
mem_node *next, *prev;
list* lst;
uint64_t index;
};

mem_pool() : _mutex(), _count(0), _size(0), _allocated(nullptr), _free_list(), 
_allocated_list(), _did_init(false), _name("unnamed_pool") {}

void init(uint64_t size, uint64_t count, const std::string& name){
std::unique_lock<std::mutex> g(_mutex);
if(_did_init) {
return;
}
_size = size;
_count = count;
_allocated = (char*)malloc(count * size);
_nodes = new mem_node[count];

for (int i = 0 ; i < count ; i ++){
_nodes[i].index = i;
_nodes[i].next = nullptr;
_nodes[i].prev = nullptr;
_free_list.push_back(_nodes+i);
}

if (!name.empty()) {
_name = name;
}
_did_init = true;
}

char* allocate_node(){
std::unique_lock<std::mutex> g(_mutex);
if (_free_list.head == nullptr){
return nullptr;
}
mem_node* n = _free_list.erase(_free_list.head);
_allocated_list.push_back(n);

return _allocated + (_size * n->index);
}

bool deallocate_node(char* ptr){
std::unique_lock<std::mutex> g(_mutex);
uint64_t index = (ptr - _allocated) / _size;
if (_nodes[index].lst == &_allocated_list){
_allocated_list.erase(_nodes + index);
_free_list.push_back(_nodes + index);
return true;
} else {
// not allocated
return false;
}
}

void close(){
std::unique_lock<std::mutex> g(_mutex);
if (!_did_init){
return;
}

_free_list.clear();
_allocated_list.clear();

free(_allocated);
delete[] _nodes;
_allocated = nullptr;
_nodes = nullptr;
_did_init = false;
}

protected:
bool _did_init;
std::string _name;
char* _allocated;
mem_node* _nodes;
mem_node::list _free_list;
mem_node::list _allocated_list;
uint64_t _size;
uint64_t _count;
std::mutex _mutex;
};

class handlers_pool {
public:
static handlers_pool &get_pool() {
static handlers_pool the_pool;
return the_pool;
}

mem_pool &allocator() {
return *_pool;
}

private:
handlers_pool() {
_pool = new mem_pool();
_pool->init(2048, 100, "handlers");
}
~handlers_pool() {
_pool->close();
delete _pool;
_pool = nullptr;
}
mem_pool *_pool;
};

#endif // GRPC_EXAMPLE_MEMORY_POOL_H

//
// Created by Dan Cohen on 11/11/2024.
//
#include <string>
#include <iostream>
#include <grpcpp/grpcpp.h>
#include "protos/example/v1/example.grpc.pb.h"

using std::cout;
using std::endl;

using namespace example::v1;

void usage() {
cout << "Usage: grpc_client [p|v]" << endl;
cout << "p - send a ping request to the server" << endl;
cout << "v - ask the server version" << endl;
}


void version() {
cout << "Asking for version..." << endl;
auto address = std::string("127.0.0.1");
auto port = std::string("6212");

// Starting a grpc client to run the version command.
std::unique_ptr<ExampleService::Stub> stub;
std::shared_ptr<grpc::Channel> channel;
std::string server_address = address + ":" + port;
channel = grpc::CreateChannel(server_address, 
grpc::InsecureChannelCredentials());
stub = ExampleService::NewStub(channel);

// Sending the ping request.
VersionGetRequest request;
VersionGetResponse response;
grpc::ClientContext context;
auto ret = stub->VersionGet(&context, request, &response);
if (ret.ok()) {
cout << response.DebugString() << endl;
}
else {
std::cout << "VersionGet failed with the following error: error_code=" << 
ret.error_code() << std::endl;
std::cout << "Error message: '" << ret.error_message() << "'" << std::endl;
std::cout << "Error details: '" << ret.error_details() << "'" << std::endl;
}
}

void ping() {
cout << "Sending ping..." << endl;
auto address = std::string("127.0.0.1");
auto port = std::string("6212");
// Starting a grpc client to run the ping command.
std::unique_ptr<ExampleService::Stub> stub;
std::shared_ptr<grpc::Channel> channel;
std::string server_address = address + ":" + port;
channel = grpc::CreateChannel(server_address, 
grpc::InsecureChannelCredentials());
stub = ExampleService::NewStub(channel);

// Sending the ping request.
ServerPingRequest request;
request.mutable_ping()->set_ping_generation(time(nullptr));
cout << "Sending ping request with generation=" << 
request.ping().ping_generation() << endl;
ServerPingResponse response;
grpc::ClientContext context;
auto ret = stub->ServerPing(&context, request, &response);
if (ret.ok()) {
cout << "Received ping response with generation=" << 
response.pong().ping().ping_generation() << endl;
cout << "Server pong:" << response.DebugString() << endl;
}
else {
cout << "Ping failed with the following error: error_code=" << 
ret.error_code() << endl;
cout << "Error message: '" << ret.error_message() << "'" << endl;
cout << "Error details: '" << ret.error_details() << "'" << endl;
}
}

int main(int argc, char **argv) {
if (argc != 2) {
usage();
return 1;
}
std::string command(argv[1]);
if (command == "p") {
ping();
}
else if (command == "v") {
version();
}
else {
usage();
return 1;
}
return 0;
}

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion visit 
https://groups.google.com/d/msgid/grpc-io/5652a7ff-dac1-4210-9291-24776a6868b1n%40googlegroups.com.

Reply via email to