Hello,
First let me explain what we have in our C++ gRPC Async server codebase:
- We have 2 unary based response RPCs.
- And we have 2 stream based response RPCs which will cover over 95% of the
client's API consumption, meaning they are really important to our
streaming based implementation.
>From the 2 stream based response RPCs, below one is the most critical to us:
// Inner class StreamAssetNodes
class StreamAssetNodes : public RequestBase {
public:
StreamAssetNodes( AsyncAssetStreamerManager& owner ) : RequestBase( owner
), ownerClass( owner ) {
owner_.grpc().service_.RequestStreamAssetNodes(
&context_, &stream_, cq(), cq(), in_handle_.tag( Handle::Operation::CONNECT,
[this, &owner]( bool ok, Handle::Operation /* op */ ) {
LOG_DEBUG << "\n" + me( *this ) << "\n\n
*****************************************************************\n"
<< "- Processing a new connect from " << context_.peer()
<< "\n\n*****************************************************************\n"
<< endl;
cout << "\n" + me( *this ) << "\n
*****************************************************************\n"
<< "- Processing a new connect from " << context_.peer() << "\n
*****************************************************************\n"
<< endl;
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The CONNECT-operation failed." << endl;
cout << "The CONNECT-operation failed." << endl;
return;
}
// Creates a new instance so the service can handle requests from a new
client
owner_.createNew<StreamAssetNodes>( owner );
// Reads request's parameters
readNodeIds();
} ) );
}
private:
// Objects and variables
AsyncAssetStreamerManager& ownerClass;
::Illuscio::AssetNodeIds request_;
::Illuscio::AssetNodeComponent reply_;
::grpc::ServerContext context_;
::grpc::ServerAsyncReaderWriter<decltype( reply_ ), decltype( request_ )>
stream_ { &context_ };
vector<string> nodeids_vector;
// Contains mapping for all the nodes of a set of assets
json assetsNodeMapping;
// Contains mapping for all the nodes of a particular asset
json assetNodeMapping;
ifstream nodeFile;
// Handle for messages coming in
Handle in_handle_ { *this };
// Handle for messages going out
Handle out_handle_ { *this };
int fileNumber = 0;
const int chunk_size = 16 * 1024;
char buffer[16 * 1024];
// Methods
void readNodeIds() {
// Reads RPC request parameters
stream_.Read( &request_, in_handle_.tag( Handle::Operation::READ, [this](
bool ok, Handle::Operation op ) {
if ( !ok ) [[unlikely]] { return; }
// Assigns the request to the nodeids vector
nodeids_vector.assign( request_.nodeids().begin(), request_.nodeids().end()
);
request_.clear_nodeids();
if ( !nodeids_vector.empty() ) {
ownerClass.assetNodeMapping = ownerClass.assetsNodeMapping[request_.uuid()];
if ( ownerClass.assetNodeMapping.empty() ) {
stream_.Finish( grpc::Status( grpc::StatusCode::NOT_FOUND, "Asset's UUID
not found in server..." ),
in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::
Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed." << endl;
cout << "The FINISH request-operation failed." << endl;
}
LOG_DEBUG << "Asset's UUID not found in server: " << request_.uuid() << endl
;
cout << "Asset's UUID not found in server: " << request_.uuid() << endl;
} ) );
return;
}
writeNodeFile( nodeids_vector.front() );
} else {
stream_.Finish( grpc::Status( grpc::StatusCode::DATA_LOSS, "Asset' node ids
empty. Without node ids node streaming can't start..." ),
in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::
Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed.";
cout << "The FINISH request-operation failed.";
}
LOG_DEBUG << "Asset' node ids coming empty on the request. Without node ids
node streaming can't start..." << endl;
cout << "Asset' node ids coming empty on the request. Without node ids node
streaming can't start..." << endl;
} ) );
}
} ) );
}
void writeNodeFile( const string& nodeId ) {
// Opens the file which contains the requested node
nodeFile.open( string( ownerClass.assetNodeMapping[nodeId] ), ios::binary );
if ( !nodeFile.is_open() ) {
LOG_DEBUG << "Asset's node file open operation failed for node:" << nodeId
<< endl;
cout << "Asset's node file open operation failed for node:" << nodeId <<
endl;
}
splitFileAndWriteChunks();
}
void splitFileAndWriteChunks() {
setReplyWithBuffer();
stream_.Write( reply_, out_handle_.tag( Handle::Operation::WRITE, [this](
bool ok, Handle::Operation op ) {
if ( !nodeFile.eof() ) {
splitFileAndWriteChunks();
} else if ( !nodeids_vector.empty() ) {
nodeFile.close();
nodeids_vector.erase( nodeids_vector.begin() );
if ( !nodeids_vector.empty() ) {
writeNodeFile( nodeids_vector.front() );
} else {
finishIfDone();
}
}
} ) );
}
void setReplyWithBuffer() {
// Fills read buffer
nodeFile.read( buffer, chunk_size );
// Prepare reply and start writing
reply_.Clear();
reply_.set_chunk_data( buffer, static_cast<int>( nodeFile.gcount() ) );
}
// We wait until all incoming messages are received and all outgoing
messages are sent
// before we send the finish message.
void finishIfDone() {
stream_.Finish( grpc::Status::OK, out_handle_.tag( Handle::Operation::FINISH,
[this]( bool ok, Handle::Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed." << endl;
cout << "The FINISH request-operation failed." << endl;
}
} ) );
}
};
So the idea with above code is that the request is basically an array of
strings `ids` (ex. "1", "2", "3", ... btw it is defined as a stream in
protobuf) and each of those ids are pointing to a small file which is
stored on the server. Now, once the request is read on the rpc, it will
take the first Id, will open the file it points to and then start to write
the file in chunks to the client as a stream type response and when it
finishes, then it takes the second Id from the array and does the same
thing again and again until there are no more Ids left in the request's
array.
>From the client perspective the behavior is, a singe client should call
above RPC passing this ids array having a size of probably 500 elements
(different ids) and also the number of calls to this RPC should be like
1000 calls per seconds.
We are using a shared completion queue for all RPCs and we do not have a
multithreading approach.
With all above background, we would like to know if we can implement a more
efficient approach for above RPC method probably based in multi-threading,
that's our ultimate goal.
Questions:
1.- Can the GRPC team tell us how to approach this problem in detail, also
in a away we can use a multi-threading strategy?
2.- How to use completion queues more efficiently along with these new
threads?
The reason why we are asking these questions, is because we feel we are not
leveraging the real power of gRPC for this specific use case.
Please let us know if you need more details from us.
Thanks in advance.
Pedro Alfonso
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/ee66c5f6-6591-4096-83eb-cc74e91dc04dn%40googlegroups.com.