I've been trying (and struggling) to create a type of discussion between
the Server and Client, where the Client sends the initial request, and in
some cases, the Server will require additional information from the Client,
before it can complete the overall request.
Something like...
Client: Hello
Server: Name?
Client: John
Server: Age?
Client: 36
Server: Here's the collated data.
Currently, I'm able to get the Server to respond initially (Name?), but on
the subsequent response, it aborts abruptly (Age?). The idea here is that
occasionally the server may need more information from the client, which we
cannot send across in the original message. Furthermore, while the on the
client side is hardcoded, that's not the actual case in the implementation.
I'm just trying to simplify it here.
Any help is definitely appreciated, and let me know if anything is not
clear.
Code below...
*PROTO*
service HelloConversationSupplierService {
rpc SayHello(stream HelloRequest) returns (stream HelloResponse) {}
}
message HelloRequest {
MethodName Name = 1;
string UserName = 2;
int Age = 3;
}
message HelloResponse {
MethodName Name = 1;
PersonData Person = 2;
}
message Person {
string UserName = 1;
int Age = 2;
}
enum MethodName {
Start = 0;
GetName = 1;
GetAge = 2;
Complete = 3;
}
*CLIENT*
PersonRef SayHello()
{
HelloRequest request;
request.set_name(MethodName::Start); // only field we set because we
don't have any other data
std::shared_ptr<grpc::ClientReaderWriter<HelloRequest,
HelloResponse>> stream(_sayHelloStub->SayHello(&context)); //_sayHelloStub
is created and initalized already at this point
stream->Write(request);
while (stream->Read(&response))
{
switch (response.name())
{
default:
case MethodName::Complete:
{
stream->WritesDone();
grpc::Status status = stream->Finish();
if(!status.ok())
OutputDebugString("ERROR: SayHello rpc Failed");
PersonRef person;
person.Name = response.person().username();
person.Age = response.person().age();
return person;
}
case MethodName::GetName:
{
HelloRequest getName;
getName.set_name(MethodName::GetName);
getName.set_username("Bill");
stream->Write(getName);
break;
}
case MethodName::GetAge:
{
HelloRequest getAge;
getAge.set_name(MethodName::GetAge);
getAge.set_age("36");
stream->Write(getAge);
break;
}
}
}
*SERVER*
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run() {
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address,
grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate
with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous
communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
private:
// Class encompasing the state and logic needed to serve a request.
class CallData : ICallback
{
public:
// Take in the "service" instance (in this case representing an
asynchronous
// server) and the completion queue "cq" used for asynchronous
communication
// with the gRPC runtime.
CallData(HelloConversationSupplierService::AsyncService* service,
ServerCompletionQueue* cq)
: service_(service), cq_(cq), stream_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed(ModelManagerPartBrepCreationEngineImpl* brepCreationEngine)
{
switch (status_)
{
case START:
{
service_->RequestSayHello(&ctx_, &stream_, cq_, cq_, this);
status_ = PROCESS;
break;
}
case PROCESS:
{
new CallData(service_, cq_);
stream_.Read(&request_, this);
status_ = READ;
break;
}
case READ:
{
if(request_.name() == MethodName::Start)
Start(&response, this);
stream_.Write(_response, this);
status_ = WRITE;
break;
}
case WRITE:
{
stream_.Finish(Status::OK, this);
status_ = FINISH;
break;
}
case FINISH:
{
delete this;
}
}
}
void Start(SayHelloResponse response, CallData* calldata)
{
PersonRef person = engine_->Start(calldata); // I know "engine_ isn't
defined elsewhere, but for simplicity, it is a separate process which
handles all the heavy work. GRPC is just used to talk back and forth.
CallData implements the callbacks the engine may use. (Occasionally, engine
won't need anymore data and will return a fully defined PersonRef on its
own)
response.set_username(person.userName);
response.set_age(person.Age);
response.set_name(MethodName::Complete);
}
private:
HelloConversationSupplierService::AsyncService* service_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
// The means to get back to the client.
ServerAsyncReaderWriter<HelloRequest,HelloReply> stream_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { START, PROCESS, READ, WRITE, FINISH };
CallStatus status_; // The current serving state.
// ICallback Methods. These are callback methods which will be called
from the Server-side engine.
CallData::Name(string* name)
{
_response.Clear();
_response.set_name(MethodName::GetName); // here, we're telling the
client that we need to get the user's name
_stream.Write(_response, this);
void* tag;
bool ok;
while (true)
{
SayHelloRequest request;
_cq->Next(&tag, &ok);
_stream.Read(&request, this);
if(request.name() == MethodName::GetName)
{
name = request.user_name(); *// I can successfully get the hardcoded
"BILL" from the client here*
break;
}
}
}
CallData::Age(int* age)
{
_response.Clear();
_response.set_name(MethodName::GetAge);
_stream.Write(_response, this);
void* tag;
bool ok;
while (true)
{
SayHelloRequest request;
_cq->Next(&tag, &ok);
_stream.Read(&request, this); // *THIS IS WHERE IT ABORTS *
if(request.name() == MethodName::GetAge)
{
name = request.age();
break;
}
}
}
};
// This can be run in multiple threads if needed.
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return
value
// tells us whether there is any kind of event or cq_ is shutting
down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
HelloConversationSupplierService::AsyncService service_;
std::unique_ptr<Server> server_;
};
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/2bce23ca-fbe8-4f80-ac9a-afaad3818c57%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.