//this is server code of bidirectional grpc
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <algorithm>
#include <chrono>
#include <cmath>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include "helper.h"
#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#ifdef BAZEL_BUILD
#include "examples/protos/route_guide.grpc.pb.h"
#else
#include "route_guide.grpc.pb.h"
#endif
using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::Status;
using routeguide::Feature;
using routeguide::Point;
using routeguide::Rectangle;
using routeguide::RouteGuide;
using routeguide::RouteNote;
using routeguide::RouteSummary;
using std::chrono::system_clock;
class RouteGuideImpl final : public RouteGuide::CallbackService {
public:
explicit RouteGuideImpl(const std::string& db) {
routeguide::ParseDb(db, &feature_list_);
}
grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
CallbackServerContext* context) override {
class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
public:
Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
: mu_(mu), received_notes_(received_notes) {
StartRead(¬e_);
}
void OnDone() override { delete this; }
void OnReadDone(bool ok) override {
if (ok) {
// Unlike the other example in this directory that's not using
// the reactor pattern, we can't grab a local lock to secure the
// access to the notes vector, because the reactor will most
likely
// make us jump threads, so we'll have to use a different locking
// strategy. We'll grab the lock locally to build a copy of the
// list of nodes we're going to send, then we'll grab the lock
// again to append the received note to the existing vector.
mu_->Lock();
std::copy_if(received_notes_->begin(), received_notes_->end(),
std::back_inserter(to_send_notes_),
[this](const RouteNote& note) {
return note.location().latitude() ==
note_.location().latitude() &&
note.location().longitude() ==
note_.location().longitude();
});
mu_->Unlock();
notes_iterator_ = to_send_notes_.begin();
NextWrite();
} else {
std::cout << "some client finished" << std::endl;
Finish(Status::OK);
}
}
void OnWriteDone(bool /*ok*/) override { NextWrite(); }
private:
void NextWrite() {
if (notes_iterator_ != to_send_notes_.end()) {
StartWrite(&*notes_iterator_);
notes_iterator_++;
} else {
mu_->Lock();
received_notes_->push_back(note_);
mu_->Unlock();
StartRead(¬e_);
}
}
RouteNote note_;
absl::Mutex* mu_;
std::vector<RouteNote>* received_notes_;
std::vector<RouteNote> to_send_notes_;
std::vector<RouteNote>::iterator notes_iterator_;
};
return new Chatter(&mu_, &received_notes_);
}
private:
std::vector<Feature> feature_list_;
absl::Mutex mu_;
std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
};
void RunServer(const std::string& db_path) {
std::string server_address("0.0.0.0:50051");
RouteGuideImpl service(db_path);
ServerBuilder builder;
builder.AddListeningPort(server_address,
grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
server->Wait();
}
int main(int argc, char** argv) {
// Expect only arg: --db_path=path/to/route_guide_db.json.
std::string db = routeguide::GetDbFileContent(argc, argv);
RunServer(db);
return 0;
}
среда, 22 марта 2023 г. в 01:42:07 UTC+3, Dmitry Gorelov:
> Oh man, it is not working even with *one *client! same problem in
> proto_utils.h
>
>
> среда, 22 марта 2023 г. в 01:12:55 UTC+3, Dmitry Gorelov:
>
>> Hi All,
>>
>> please help to modify this peace of server code for bidirectional stream
>> in order to make it work correclty with *multiple clients* at one time.
>> Currently it crashes with segmentation fault in the proto_utils.h.
>>
>> class RouteGuideImpl final : public RouteGuide::CallbackService {
>> public:
>> explicit RouteGuideImpl(const std::string& db) {
>> routeguide::ParseDb(db, &feature_list_);
>> }
>>
>> grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
>> CallbackServerContext* context) override {
>> class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
>> public:
>> Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
>> : mu_(mu), received_notes_(received_notes) {
>> StartRead(¬e_);
>> }
>>
>> void OnDone() override { delete this; }
>> void OnReadDone(bool ok) override {
>> if (ok) {
>> // Unlike the other example in this directory that's not using
>> // the reactor pattern, we can't grab a local lock to secure the
>> // access to the notes vector, because the reactor will most
>> likely
>> // make us jump threads, so we'll have to use a different
>> locking
>> // strategy. We'll grab the lock locally to build a copy of the
>> // list of nodes we're going to send, then we'll grab the lock
>> // again to append the received note to the existing vector.
>> mu_->Lock();
>> std::copy_if(received_notes_->begin(), received_notes_->end(),
>> std::back_inserter(to_send_notes_),
>> [this](const RouteNote& note) {
>> return note.location().latitude() ==
>> note_.location().latitude() &&
>> note.location().longitude() ==
>> note_.location().longitude();
>> });
>> mu_->Unlock();
>> notes_iterator_ = to_send_notes_.begin();
>> NextWrite();
>> } else {
>> std::cout << "some client finished" << std::endl;
>> Finish(Status::OK);
>> }
>> }
>> void OnWriteDone(bool /*ok*/) override { NextWrite(); }
>>
>> private:
>> void NextWrite() {
>> if (notes_iterator_ != to_send_notes_.end()) {
>> StartWrite(&*notes_iterator_);
>> notes_iterator_++;
>> } else {
>> mu_->Lock();
>> received_notes_->push_back(note_);
>> mu_->Unlock();
>> StartRead(¬e_);
>> }
>> }
>> RouteNote note_;
>> absl::Mutex* mu_;
>> std::vector<RouteNote>* received_notes_;
>> std::vector<RouteNote> to_send_notes_;
>> std::vector<RouteNote>::iterator notes_iterator_;
>> };
>> return new Chatter(&mu_, &received_notes_);
>> }
>>
>> private:
>> std::vector<Feature> feature_list_;
>> absl::Mutex mu_;
>> std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
>> };
>>
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/f0b42126-0064-40d3-8f92-b8974a062f91n%40googlegroups.com.