I've been trying (and struggling) to create a type of discussion between 
the Server and Client, where the Client sends the initial request, and in 
some cases, the Server will require additional information from the Client, 
before it can complete the overall request. 

Something like...
Client: Hello
Server: Name?
Client: John
Server: Age?
Client: 36
Server: Here's the collated data.

Currently, I'm able to get the Server to respond initially (Name?), but on 
the subsequent response, it aborts abruptly (Age?). The idea here is that 
occasionally the server may need more information from the client, which we 
cannot send across in the original message. Furthermore, while the on the 
client side is hardcoded, that's not the actual case in the implementation. 
I'm just trying to simplify it here. 

Any help is definitely appreciated, and let me know if anything is not 
clear. 

Code below...

*PROTO*
service HelloConversationSupplierService {
rpc SayHello(stream HelloRequest) returns (stream HelloResponse) {}
}

message HelloRequest {
  MethodName Name = 1;
  string UserName = 2;
  int Age = 3;
}

message HelloResponse {
 MethodName Name = 1;
 PersonData Person = 2;
}

message Person {
 string UserName = 1;
 int Age = 2;
}


enum MethodName {
 Start = 0; 
 GetName = 1;
 GetAge = 2;
 Complete = 3;
} 


*CLIENT*

PersonRef SayHello()
{
      HelloRequest request;
      request.set_name(MethodName::Start); // only field we set because we 
don't have any other data 
      std::shared_ptr<grpc::ClientReaderWriter<HelloRequest, 
HelloResponse>> stream(_sayHelloStub->SayHello(&context));  //_sayHelloStub 
is created and initalized already at this point
      stream->Write(request);

      while (stream->Read(&response))
      {
        switch (response.name())
        {

          default:
          case MethodName::Complete:
          {
            stream->WritesDone();
            grpc::Status status = stream->Finish();

            if(!status.ok())
              OutputDebugString("ERROR: SayHello rpc Failed");

            PersonRef person;
            person.Name = response.person().username();
            person.Age = response.person().age();

            return person;
          }

          case MethodName::GetName:
          {
            HelloRequest getName;
            getName.set_name(MethodName::GetName);
            getName.set_username("Bill");

            stream->Write(getName);

            break;
          }

          case MethodName::GetAge:
          {
            HelloRequest getAge;
            getAge.set_name(MethodName::GetAge);
            getAge.set_age("36");

            stream->Write(getAge);

            break;
          }
     }
}


*SERVER*
class ServerImpl final {
 public:
  ~ServerImpl() {
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
  }

  // There is no shutdown handling in this code.
  void Run() {
    std::string server_address("0.0.0.0:50051");

    ServerBuilder builder;
    // Listen on the given address without any authentication mechanism.
    builder.AddListeningPort(server_address, 
grpc::InsecureServerCredentials());
    // Register "service_" as the instance through which we'll communicate 
with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService(&service_);
    // Get hold of the completion queue used for the asynchronous 
communication
    // with the gRPC runtime.
    cq_ = builder.AddCompletionQueue();
    // Finally assemble the server.
    server_ = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRpcs();
  }

 private:
  // Class encompasing the state and logic needed to serve a request.
  class CallData : ICallback
 {
   public:
    // Take in the "service" instance (in this case representing an 
asynchronous
    // server) and the completion queue "cq" used for asynchronous 
communication
    // with the gRPC runtime.
    CallData(HelloConversationSupplierService::AsyncService* service, 
ServerCompletionQueue* cq)
        : service_(service), cq_(cq), stream_(&ctx_), status_(CREATE) {
      // Invoke the serving logic right away.
      Proceed();
    }

    void Proceed(ModelManagerPartBrepCreationEngineImpl* brepCreationEngine)
    {
      switch (status_)
      {
        case START:
        {
          service_->RequestSayHello(&ctx_, &stream_, cq_, cq_, this);
          status_ = PROCESS;
          break;
        }
        case PROCESS:
        {
          new CallData(service_, cq_);

          stream_.Read(&request_, this);
          status_ = READ;
          break;
        }
        case READ:
        {
          if(request_.name() == MethodName::Start)
            Start(&response, this);

          stream_.Write(_response, this);

          status_ = WRITE;
          break;
        }
        case WRITE:
        {

          stream_.Finish(Status::OK, this);
          status_ = FINISH;
          break;
        }
        case FINISH:
        {
          delete this;
        }
      }
    }

   void Start(SayHelloResponse response, CallData* calldata)
{
    PersonRef person = engine_->Start(calldata); // I know "engine_ isn't 
defined elsewhere, but for simplicity, it is a separate process which 
handles all the heavy work. GRPC is just used to talk back and forth. 
CallData implements the callbacks the engine may use. (Occasionally, engine 
won't need anymore data and will return a fully defined PersonRef on its 
own) 
    response.set_username(person.userName);
    response.set_age(person.Age);
    response.set_name(MethodName::Complete);
}

   private:
    HelloConversationSupplierService::AsyncService* service_;
    ServerCompletionQueue* cq_;
    ServerContext ctx_;

    // What we get from the client.
    HelloRequest request_;
    // What we send back to the client.
    HelloReply reply_;

    // The means to get back to the client.
    ServerAsyncReaderWriter<HelloRequest,HelloReply> stream_;

    // Let's implement a tiny state machine with the following states.
    enum CallStatus { START, PROCESS, READ, WRITE, FINISH };
    CallStatus status_;  // The current serving state.


  // ICallback Methods. These are callback methods which will be called 
from the Server-side engine. 
   CallData::Name(string* name) 
   {
      _response.Clear();
      _response.set_name(MethodName::GetName); // here, we're telling the 
client that we need to get the user's name
      _stream.Write(_response, this);

      void* tag;
      bool ok;
      while (true)
      {
        SayHelloRequest request;
        _cq->Next(&tag, &ok);
        _stream.Read(&request, this);

 

        if(request.name() == MethodName::GetName)

{ 

 name = request.user_name(); *// I can successfully get the hardcoded 
"BILL" from the client here*

break;

}

       } 

  }

  CallData::Age(int* age)
   {
      _response.Clear();
      _response.set_name(MethodName::GetAge);
      _stream.Write(_response, this);

      void* tag;
      bool ok;
      while (true)
      {
        SayHelloRequest request;
        _cq->Next(&tag, &ok);
        _stream.Read(&request, this);  // *THIS IS WHERE IT ABORTS *

 

        if(request.name() == MethodName::GetAge)

{ 

 name = request.age();

break;

}

       } 
  }

  };

  // This can be run in multiple threads if needed.
  void HandleRpcs() {
    // Spawn a new CallData instance to serve new clients.
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) {
      // Block waiting to read the next event from the completion queue. The
      // event is uniquely identified by its tag, which in this case is the
      // memory address of a CallData instance.
      // The return value of Next should always be checked. This return 
value
      // tells us whether there is any kind of event or cq_ is shutting 
down.
      GPR_ASSERT(cq_->Next(&tag, &ok));
      GPR_ASSERT(ok);
      static_cast<CallData*>(tag)->Proceed();
    }
  }

  std::unique_ptr<ServerCompletionQueue> cq_;
  HelloConversationSupplierService::AsyncService service_;
  std::unique_ptr<Server> server_;
};

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/2bce23ca-fbe8-4f80-ac9a-afaad3818c57%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to