Thank you for your response. It helped me find the culprit of the problem.
The main reason the 'receive' callback was not executed on the client side 
was the sleep loop that starves the event loop.
Control needs to be passed explicitly to the event loop in the python 
client by calling 'capnp.poll_once()'.
The following modification takes care of that:

    client = capnp.TwoPartyClient('127.0.0.1:11223')
    backend = client.bootstrap().cast_as(a.Backend)
    #backend = proxy.getBackend().backend

    receiver = ReceiverImpl(1000)

    backend.subscribe(receiver).wait()
    print("Subscribed to backend")
    while receiver.recv_cnt < 1000:
        #time.sleep(1)
        time.sleep(0.001)
        capnp.poll_once()

The promise also needs to be detached on server side, as you suggested, 
otherwise it wouldn't get resolved.

On Tuesday, May 10, 2022 at 9:00:29 AM UTC+2 lawe...@gmail.com wrote:

> In your send function you should 
> request.send().detach([&](kj::Exception&& error) { /*handle returned 
> error*/; });
> If you don't the promise is destructed and the call gets cancelled.
>
> On Friday, May 6, 2022 at 1:22:50 PM UTC+2 Gyorgy Miru wrote:
>
>> Hi all,
>>
>> I am trying to implement a simple architecture where various backend 
>> servers gather data (on Android devices) and push them to the subscribers. 
>> In the backend the data gatherer runs is a separate thread from the server. 
>> My current issue is when messages are sent to the subscriber, they are not 
>> received on the client side. I am not sure if I need to wait on the promise 
>> that is returned by the send() call in order to actually place the message 
>> on the wire or maybe the problem is completely unrelated.
>>
>> First, I've tried using executor->executeSync(), from the gatherer 
>> thread, to the send the messages, but then I can't wait on the promise 
>> because the code runs in an event callback.
>> I've also tried to start a kj event loop in the gatherer thread and 
>> publish the messages from there, but then when  I try to wait I get the 
>> following exception " Nothing to wait for; this thread would hang forever."
>> Please find the relevant code samples below:
>>
>> *test.capnp*
>>
>> @0xf84f5237f35aec13;
>>
>> interface Proxy {
>>    getBackend @0 () -> (backend: Backend);
>> }
>>
>> interface Backend {
>>    consume @0 (msg :Message);
>>    subscribe @1 (receiver :Receiver);
>> }
>>
>> interface Receiver {
>>    receive @0 (msg: Message);
>> }
>>
>> struct Message {
>>    body @0 :Data;
>> }
>>
>> *backend.cpp*
>>
>> class BackendImpl final: public Backend::Server {
>>     private:
>>         bool first_received;
>>         bool receiver_ready;
>>         unsigned recv_count;
>>         unsigned count;
>>         unsigned size;
>>         chrono::time_point<chrono::high_resolution_clock> begin;
>>         Receiver::Client receiver;
>>     
>>     public:
>>         BackendImpl(unsigned cnt): first_received(false), 
>> receiver_ready(false), count(cnt), receiver(nullptr) {};
>>         kj::Promise<void> consume(ConsumeContext context) override {
>>             //... Benchmark tests are omitted
>>             return kj::READY_NOW;
>>         }
>>
>>         kj::Promise<void> subscribe(SubscribeContext context) override {
>>             this->receiver = context.getParams().getReceiver();
>>             cout << "Receiver received" << endl;
>>             this->receiver_ready = true;
>>             return kj::READY_NOW;
>>         }
>>
>>         //bool send(int cnt, int size, kj::WaitScope& ws) { // I have 
>> tried passing the wait scope
>>         bool send(int cnt, int size) {
>>             if (!this->receiver_ready) {
>>                 return false;
>>             }
>>             this->receiver_ready = false;
>>             cout << "Sending data" << endl;
>>             unsigned char* msg = new unsigned char[size];
>>             memset(msg, 'a', size);
>>
>>
>>             for (int i = 0; i < this->count; i++) {
>>                 
>>                 auto request = this->receiver.receiveRequest();
>>                 request.getMsg().setBody(capnp::Data::Reader(msg, size));
>>                 auto promise = request.send();
>>                 //auto promise = request.send().wait(ws);
>>
>>                 auto res = 
>> promise.then([](capnp::Response<Receiver::ReceiveResults>&& result){
>>                     cout << result.totalSize().wordCount << endl << 
>> flush; // This never gets called
>>                     return result.totalSize().wordCount;
>>                     });
>>
>>             }
>>             delete[] msg;
>>             cout << "Sending done" << endl << flush;
>>             return true;
>>         }
>> };
>>
>> BackendImpl* backend;
>> void data_pusher(const kj::Executor* executor, int size, int cnt) {
>>
>>     //kj::EventLoop loop;
>>     //loop.run();
>>     //kj::WaitScope waitScope(loop);
>>     while (executor->isLive()) {
>>         this_thread::sleep_for(1s);
>>
>>         //backend->send(cnt, size, waitScope);
>>
>>         executor->executeSync( [backend=backend, cnt=cnt, size=size] {
>>                 backend->send(cnt, size);
>>             });
>>     }    
>> }
>>
>> int main(int argc, const char* argv[]) {
>>
>>   auto be = kj::heap<BackendImpl>(10000);
>>   backend = be.get(); // There must be a sexier way to do this
>>   
>>   // Set up a server.
>>   capnp::EzRpcServer server(kj::mv(be), "127.0.0.1:11223");
>>
>>   waitScope = &server.getWaitScope();
>>
>>   const kj::Executor& executor = kj::getCurrentThreadExecutor();
>>   std::thread thr(data_pusher, &executor, 4096, 10000);
>>   std::cout << "Backend Ready" << std::endl;
>>
>>   // Run forever, accepting connections and handling requests.
>>   kj::NEVER_DONE.wait(*waitScope);
>>   thr.join();
>> }
>>
>> *client.py*
>>
>> capnp.remove_import_hook()
>> a = capnp.load('./test.capnp')
>>
>> class ReceiverImpl(a.Receiver.Server):
>>
>>     def __init__(self, count):
>>         self.count = count
>>         self.has_received = False
>>         self.recv_cnt = 0
>>
>>     def receive(self, msg, _context, **kwargs):
>>         print("We get called, yay!")
>>         #... benchmarking code omitted
>>
>> def direct_subscribe():
>>
>>     client = capnp.TwoPartyClient('127.0.0.1:11223')
>>     backend = client.bootstrap().cast_as(a.Backend)
>>     #backend = proxy.getBackend().backend
>>
>>     receiver = ReceiverImpl(10000)
>>
>>     backend.subscribe(receiver).wait()
>>
>>     while receiver.recv_cnt < 10000:
>>         time.sleep(1)
>>
>> direct_subscribe()
>>
>> When I execute this code, without any wait() calls in the publisher, the 
>> sending immediately completes async, but the receive() function of the 
>> receiver never gets called.
>>
>> Thank you for your time and assistance.
>> -Gym
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/78900bad-9731-4085-bbff-4cb3c9b1111fn%40googlegroups.com.

Reply via email to