Please check the following code, it fixes the crash of the
route_guide_callback_server!
#include <algorithm>
#include <chrono>
#include <cmath>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include "helper.h"
#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#ifdef BAZEL_BUILD
#include "examples/protos/route_guide.grpc.pb.h"
#else
#include "route_guide.grpc.pb.h"
#endif
using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::Status;
using routeguide::Feature;
using routeguide::Point;
using routeguide::Rectangle;
using routeguide::RouteGuide;
using routeguide::RouteNote;
using routeguide::RouteSummary;
using std::chrono::system_clock;
class RouteGuideImpl final : public RouteGuide::CallbackService {
public:
explicit RouteGuideImpl(const std::string& db) {
routeguide::ParseDb(db, &feature_list_);
}
grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
CallbackServerContext* context) override {
class Chatter : public grpc::ServerBidiReactor<RouteNote,
RouteNote> {
public:
Chatter(absl::Mutex* mu, std::vector<RouteNote>*
received_notes)
: mu_(mu), received_notes_(received_notes) {
StartRead(¬e_);
}
void OnDone() override { delete this; }
void OnReadDone(bool ok) override
{
if (ok)
{
// Unlike the other example in this directory that's not
using
// the reactor pattern, we can't grab a local lock to
secure the
// access to the notes vector, because the reactor will
most likely
// make us jump threads, so we'll have to use a
different locking
// strategy. We'll grab the lock locally to build a copy
of the
// list of nodes we're going to send, then we'll grab
the lock
// again to append the received note to the existing
vector.
mu_->Lock();
std::copy_if(received_notes_->begin(),
received_notes_->end(),
std::back_inserter(to_send_notes_),
[this](const RouteNote& note) {
return note.location().latitude() ==
note_.location().latitude() &&
note.location().longitude() ==
note_.location().longitude();
});
notes_iterator_ = to_send_notes_.begin();
mu_->Unlock();
NextWrite();
} else {
//std::cout << "some client finished" << std::endl;
Finish(Status::OK);
}
}
void OnWriteDone(bool ok) override
{
if (ok)
{
NextWrite();
}
else
{
std::cout << "some client finished write" << std::endl;
Finish(Status::OK);
}
}
private:
void NextWrite()
{
mu_->Lock();
if (notes_iterator_ != to_send_notes_.end()) {
StartWrite(&*notes_iterator_);
notes_iterator_++;
} else {
received_notes_->push_back(note_);
StartRead(¬e_);
}
mu_->Unlock();
}
RouteNote note_;
absl::Mutex* mu_;
std::vector<RouteNote>* received_notes_;
std::vector<RouteNote> to_send_notes_;
std::vector<RouteNote>::iterator notes_iterator_;
};
return new Chatter(&mu_, &received_notes_);
}
private:
std::vector<Feature> feature_list_;
absl::Mutex mu_;
std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
};
void RunServer(const std::string& db_path) {
std::string server_address("0.0.0.0:50051");
RouteGuideImpl service(db_path);
ServerBuilder builder;
builder.AddListeningPort(server_address,
grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address <<
std::endl;
server->Wait();
}
int main(int argc, char** argv) {
// Expect only arg: --db_path=path/to/route_guide_db.json.
std::string db = routeguide::GetDbFileContent(argc, argv);
RunServer(db);
return 0;
}
среда, 22 марта 2023 г. в 02:15:55 UTC+3, Dmitry Gorelov:
> [image: 2023-03-22 02_14_54-Window.png]
>
> среда, 22 марта 2023 г. в 02:00:25 UTC+3, Dmitry Gorelov:
>
>> Both of them , the client and server, are from route_guide example. I
>> left only bidirectional part.
>> after some attemts to run client, either the client or the server crash.
>>
>> Please help to fix them!
>>
>>
>> среда, 22 марта 2023 г. в 01:59:12 UTC+3, Dmitry Gorelov:
>>
>>> //and this is client code
>>>
>>> /*
>>> *
>>> * Copyright 2021 gRPC authors.
>>> *
>>> * Licensed under the Apache License, Version 2.0 (the "License");
>>> * you may not use this file except in compliance with the License.
>>> * You may obtain a copy of the License at
>>> *
>>> * http://www.apache.org/licenses/LICENSE-2.0
>>> *
>>> * Unless required by applicable law or agreed to in writing, software
>>> * distributed under the License is distributed on an "AS IS" BASIS,
>>> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>> * See the License for the specific language governing permissions and
>>> * limitations under the License.
>>> *
>>> */
>>>
>>> #include <chrono>
>>> #include <condition_variable>
>>> #include <iostream>
>>> #include <memory>
>>> #include <mutex>
>>> #include <random>
>>>
>>> #include <string>
>>> #include <thread>
>>>
>>> #include "helper.h"
>>>
>>> #include <grpc/grpc.h>
>>> #include <grpcpp/alarm.h>
>>> #include <grpcpp/channel.h>
>>> #include <grpcpp/client_context.h>
>>> #include <grpcpp/create_channel.h>
>>> #include <grpcpp/security/credentials.h>
>>>
>>> #ifdef BAZEL_BUILD
>>> #include "examples/protos/route_guide.grpc.pb.h"
>>> #else
>>> #include "route_guide.grpc.pb.h"
>>> #endif
>>>
>>> using grpc::Channel;
>>> using grpc::ClientContext;
>>>
>>> using grpc::Status;
>>> using routeguide::Feature;
>>> using routeguide::Point;
>>> using routeguide::Rectangle;
>>> using routeguide::RouteGuide;
>>> using routeguide::RouteNote;
>>> using routeguide::RouteSummary;
>>>
>>> Point MakePoint(long latitude, long longitude) {
>>> Point p;
>>> p.set_latitude(latitude);
>>> p.set_longitude(longitude);
>>> return p;
>>> }
>>>
>>> Feature MakeFeature(const std::string& name, long latitude, long
>>> longitude) {
>>> Feature f;
>>> f.set_name(name);
>>> f.mutable_location()->CopyFrom(MakePoint(latitude, longitude));
>>> return f;
>>> }
>>>
>>> RouteNote MakeRouteNote(const std::string& message, long latitude,
>>> long longitude) {
>>> RouteNote n;
>>> n.set_message(message);
>>> n.mutable_location()->CopyFrom(MakePoint(latitude, longitude));
>>> return n;
>>> }
>>>
>>> class RouteGuideClient {
>>> public:
>>> RouteGuideClient(std::shared_ptr<Channel> channel, const std::string&
>>> db)
>>> : stub_(RouteGuide::NewStub(channel)) {
>>> routeguide::ParseDb(db, &feature_list_);
>>> }
>>>
>>> void RouteChat() {
>>> class Chatter : public grpc::ClientBidiReactor<RouteNote, RouteNote>
>>> {
>>> public:
>>> explicit Chatter(RouteGuide::Stub* stub)
>>> : notes_{MakeRouteNote("First message", 0, 0),
>>> MakeRouteNote("Second message", 0, 1),
>>> MakeRouteNote("Third message", 1, 0),
>>> MakeRouteNote("Fourth message", 0, 0)},
>>> notes_iterator_(notes_.begin()) {
>>> stub->async()->RouteChat(&context_, this);
>>> NextWrite();
>>> StartRead(&server_note_);
>>> StartCall();
>>>
>>> }
>>> void OnWriteDone(bool /*ok*/) override { NextWrite(); }
>>> void OnReadDone(bool ok) override {
>>> if (ok) {
>>> std::cout << "Got message " << server_note_.message() << " at "
>>> << server_note_.location().latitude() << ", "
>>> << server_note_.location().longitude() << std::endl;
>>> StartRead(&server_note_);
>>> }
>>> }
>>> void OnDone(const Status& s) override {
>>> std::unique_lock<std::mutex> l(mu_);
>>> status_ = s;
>>> done_ = true;
>>> cv_.notify_one();
>>> }
>>> Status Await() {
>>> std::unique_lock<std::mutex> l(mu_);
>>> cv_.wait(l, [this] { return done_; });
>>> return std::move(status_);
>>> }
>>>
>>> private:
>>> void NextWrite() {
>>> if (notes_iterator_ != notes_.end()) {
>>> const auto& note = *notes_iterator_;
>>> std::cout << "Sending message " << note.message() << " at "
>>> << note.location().latitude() << ", "
>>> << note.location().longitude() << std::endl;
>>> StartWrite(¬e);
>>> notes_iterator_++;
>>> } else {
>>> StartWritesDone();
>>> }
>>> }
>>> ClientContext context_;
>>> const std::vector<RouteNote> notes_;
>>> std::vector<RouteNote>::const_iterator notes_iterator_;
>>> RouteNote server_note_;
>>> std::mutex mu_;
>>> std::condition_variable cv_;
>>> Status status_;
>>> bool done_ = false;
>>> };
>>>
>>> Chatter chatter(stub_.get());
>>> Status status = chatter.Await();
>>> if (!status.ok()) {
>>> std::cout << "RouteChat rpc failed." << std::endl;
>>> }
>>> }
>>>
>>> private:
>>>
>>> const float kCoordFactor_ = 10000000.0;
>>> std::unique_ptr<RouteGuide::Stub> stub_;
>>> std::vector<Feature> feature_list_;
>>>
>>> };
>>>
>>> int main(int argc, char** argv) {
>>> // Expect only arg: --db_path=path/to/route_guide_db.json.
>>> std::string db = routeguide::GetDbFileContent(argc, argv);
>>> RouteGuideClient guide(
>>> grpc::CreateChannel("localhost:50051",
>>> grpc::InsecureChannelCredentials()),
>>> db);
>>>
>>> std::cout << "-------------- RouteChat --------------" << std::endl;
>>> guide.RouteChat();
>>>
>>> return 0;
>>> }
>>>
>>> среда, 22 марта 2023 г. в 01:42:07 UTC+3, Dmitry Gorelov:
>>>
>>>> Oh man, it is not working even with *one *client! same problem in
>>>> proto_utils.h
>>>
>>>
>>>>
>>>> среда, 22 марта 2023 г. в 01:12:55 UTC+3, Dmitry Gorelov:
>>>>
>>>>> Hi All,
>>>>>
>>>>> please help to modify this peace of server code for bidirectional
>>>>> stream in order to make it work correclty with *multiple clients* at
>>>>> one time. Currently it crashes with segmentation fault in the
>>>>> proto_utils.h.
>>>>>
>>>>> class RouteGuideImpl final : public RouteGuide::CallbackService {
>>>>> public:
>>>>> explicit RouteGuideImpl(const std::string& db) {
>>>>> routeguide::ParseDb(db, &feature_list_);
>>>>> }
>>>>>
>>>>> grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
>>>>> CallbackServerContext* context) override {
>>>>> class Chatter : public grpc::ServerBidiReactor<RouteNote,
>>>>> RouteNote> {
>>>>> public:
>>>>> Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
>>>>> : mu_(mu), received_notes_(received_notes) {
>>>>> StartRead(¬e_);
>>>>> }
>>>>>
>>>>> void OnDone() override { delete this; }
>>>>> void OnReadDone(bool ok) override {
>>>>> if (ok) {
>>>>> // Unlike the other example in this directory that's not
>>>>> using
>>>>> // the reactor pattern, we can't grab a local lock to secure
>>>>> the
>>>>> // access to the notes vector, because the reactor will most
>>>>> likely
>>>>> // make us jump threads, so we'll have to use a different
>>>>> locking
>>>>> // strategy. We'll grab the lock locally to build a copy of
>>>>> the
>>>>> // list of nodes we're going to send, then we'll grab the
>>>>> lock
>>>>> // again to append the received note to the existing vector.
>>>>> mu_->Lock();
>>>>> std::copy_if(received_notes_->begin(),
>>>>> received_notes_->end(),
>>>>> std::back_inserter(to_send_notes_),
>>>>> [this](const RouteNote& note) {
>>>>> return note.location().latitude() ==
>>>>> note_.location().latitude() &&
>>>>> note.location().longitude() ==
>>>>> note_.location().longitude();
>>>>> });
>>>>> mu_->Unlock();
>>>>> notes_iterator_ = to_send_notes_.begin();
>>>>> NextWrite();
>>>>> } else {
>>>>> std::cout << "some client finished" << std::endl;
>>>>> Finish(Status::OK);
>>>>> }
>>>>> }
>>>>> void OnWriteDone(bool /*ok*/) override { NextWrite(); }
>>>>>
>>>>> private:
>>>>> void NextWrite() {
>>>>> if (notes_iterator_ != to_send_notes_.end()) {
>>>>> StartWrite(&*notes_iterator_);
>>>>> notes_iterator_++;
>>>>> } else {
>>>>> mu_->Lock();
>>>>> received_notes_->push_back(note_);
>>>>> mu_->Unlock();
>>>>> StartRead(¬e_);
>>>>> }
>>>>> }
>>>>> RouteNote note_;
>>>>> absl::Mutex* mu_;
>>>>> std::vector<RouteNote>* received_notes_;
>>>>> std::vector<RouteNote> to_send_notes_;
>>>>> std::vector<RouteNote>::iterator notes_iterator_;
>>>>> };
>>>>> return new Chatter(&mu_, &received_notes_);
>>>>> }
>>>>>
>>>>> private:
>>>>> std::vector<Feature> feature_list_;
>>>>> absl::Mutex mu_;
>>>>> std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
>>>>> };
>>>>>
>>>>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/367f8e8d-574d-486f-a9f2-d0c72787caf4n%40googlegroups.com.