Hi Mark,

Usually, the best way to use Cap'n Proto is in a program that is 100%
single-threaded and event-driven. It's possible to integrate the KJ event
loop with other event loop frameworks such that they can cooperate within
the same thread -- see, for example, node-capnp, which integrates with
libuv (the event loop used by node.js).

However, if you have no knowledge of the event loop used by the application
that calls your library, then you will indeed probably need to use a
separate thread. Probably what you will need to do here is create a pipe,
where the main thread holds the write end of the pipe, and the capnp thread
holds the read end. In the capnp thread, use LowLevelAsyncIoProvider to
wrap the pipe FD so that you can listen on it asynchronously. Whenever an
RPC is initiated from the main thread, add it to a mutex-protected queue,
then write a byte to the pipe to wake up the capnp thread. The capnp thread
should then consume requests from the queue.

I do plan to add a nicer API for cross-thread events eventually, but it's
one of the long list of things that we haven't yet needed for Sandstorm and
so it hasn't been prioritized.

-Kenton

On Thu, Sep 8, 2016 at 2:27 PM, Mark Grimes <[email protected]> wrote:

> Arghh, forgot reply all for the group.  Message below...
>
> ---------- Forwarded message ----------
> From: Mark Grimes <[email protected]>
> Date: 8 September 2016 at 22:24
> Subject: Re: [capnproto] Storing interfaces for later use
> To: Kenton Varda <[email protected]>
>
>
> Hi,
> Coming back to this now that I'm ready for higher level tests.
>
> I'm a bit confused by newPromiseAndFulfiller.  Trying myself and also
> reading a few posts in this group, it's not possible to fulfil a promise
> from a different thread.  How is it possible to fulfil a promise if the
> thread that has to fulfil it is blocked in a wait()?  I guess it's in the
> event loop and not technically blocked, but as far as my code is concerned
> it's as good as blocked.  So I'm a bit confused by what it's use is outside
> the EventLoop internals or "then()" blocks.
>
> What I currently have is an override of EventPort which is thread synched
> and takes requests to create (or fulfil) promises on the EventLoop thread.
> This seems to work fine for basic promises.  Coming to use RPC however, it
> looks like I'm going to have to implement everything the standard EventPort
> does, as well as carbon copies of every class from LowLevelAsyncIoProviderImpl
> up where the only difference is that it uses my EventPort instead of
> UnixEventPort.  This seems so unnecessarily complex I must be doing
> something wrong.  I'm happy to do this but wanted to make sure I hadn't
> missed something simpler.
>
> To describe my original problem, in case I've gone so far down the wrong
> road the original issue is lost:
> I need to create an API which gives results from RPC calls as std::futures
> (so far easy), but the server and client will be periodically replicating
> data in the background without input from my API's user.  This implies I
> need the Captain Proto client (Ez or otherwise) running on another thread,
> but also able to take RPC requests from the main thread.  What's the best
> way of doing this?
>
> Thanks,
>
> Mark.
>
> On 26 August 2016 at 23:22, Kenton Varda <[email protected]> wrote:
>
>> Hi Mark,
>>
>> Cap'n Proto is single-threaded. The event loop happens on the current
>> thread.
>>
>> The event loop will run whenever you call .wait() on a promise. What you
>> need to do is create a promise that completes when your test is "done",
>> then wait on that.
>>
>> One way to do this might be to use a promise/fulfiller pair. See
>> kj::newPromiseAndFulfiller<>(). This returns a pair of a promise and a
>> fulfiller, where the fulfiller can be called later in order to provide the
>> result which will then be returned by the promise. So, in your main
>> function, you would call .wait() on the promise, and then in some other
>> code, when you determine that it's time to exit, you would call
>> fulfiller->fulfill().
>>
>> If you don't have any particular way to detect "doneness" and instead
>> want to wait for some amount of time, you can use the timer:
>>
>>     ezRpcClient.getIoProvider.getTimer().afterDelay(1 *
>> kj::SECONDS).wait();
>>
>> However, using time delays like this in tests leads to slow, flaky tests,
>> so I don't recommend it.
>>
>> -Kenton
>>
>> On Fri, Aug 26, 2016 at 10:50 AM, Mark Grimes <[email protected]> wrote:
>>
>>> Hi,
>>> As a temporary workaround I'm just making a request I don't need on the
>>> main capability, after all the calls that I actually want.  I then wait on
>>> that promise to clear the queue.  The problem is that the replication (i.e.
>>> publication) happens in the background so I can't wait on those promises,
>>> and the event loop needs to be entered for the handlers to run.
>>> That's good enough for now.  I'm not sure how to handle the event loop
>>> in the client once I get to production code, but I'll cross that bridge
>>> when I come to it.
>>>
>>> Thanks for the pointers.
>>>
>>>
>>>
>>> On 26 August 2016 at 01:25, Mark Grimes <[email protected]> wrote:
>>>
>>>> Hi,
>>>> Thanks for taking the time to respond.  I was doing all of those
>>>> things, simple communication works fine but it breaks when trying to pass a
>>>> client capability to the server for it to use in the "publish" phase.
>>>> I eventually figured out if I add in a
>>>>
>>>> kj::NEVER_DONE.wait( client.getWaitScope() );
>>>>
>>>> at the end everything works, but locks at the end of the test fixture
>>>> (as you would expect).  I think I fundamentally misunderstood where the
>>>> event loop processing is done. I thought this was performed on a separate
>>>> thread, so a simple "this_thread::sleap_for(...)" would allow all the
>>>> handlers in the queue to finish (not the most optimal I know but I wanted
>>>> to make sure everything worked first).  I now know (think?) there must be a
>>>> "wait()" _somewhere_ to enter the event loop again on the current thread
>>>> and clear the queue.
>>>>
>>>> Since I'm using this in test fixtures, NEVER_DONE is not an option
>>>> because I want to return control and carry on with other tests.
>>>>
>>>> - Is there a way with the ez-rpc interfaces to process everything
>>>> currently in the queue and then return control?  I see EventLoop has a
>>>> "run()" method but I don't see a way to get the EventLoop from
>>>> the EzRpcServer/Client.
>>>> - If not, is there something like "kj::DO_UNTIL.wait( seconds(1),
>>>> client.getWaitScope() )"; or a std::condition_variable style notification
>>>> to stop?
>>>> - With both client and server on the same event loop, there's no
>>>> difference between "client.getWaitScope()" and "server.getWaitScope()"
>>>> right?
>>>>
>>>> It does seem to be a great library by the way, the bits I've managed to
>>>> get working.
>>>>
>>>> Thanks,
>>>>
>>>> Mark.
>>>>
>>>> On 25 August 2016 at 18:38, Kenton Varda <[email protected]> wrote:
>>>>
>>>>> Hi Mark,
>>>>>
>>>>> Terminology note: Think of "capability" as meaning the same thing as
>>>>> "pointer", except it's a pointer that cannot be forged and without which
>>>>> you cannot otherwise access the target object. An "interface" describes 
>>>>> the
>>>>> methods implemented by an object to which a capability points.
>>>>>
>>>>> What you describe should work, but there are a number of ways you
>>>>> might be getting it wrong. It's hard to say which without seeing your 
>>>>> code,
>>>>> but here are some things to check:
>>>>>
>>>>> - Are you tearing down the RPC client too early? Once the network
>>>>> connection closes, all capabilities delivered through it become
>>>>> disconnected. Make sure you construct the RPC client (e.g. EzRpcClient)
>>>>> once, not every time you want to make a call.
>>>>>
>>>>> - Are you discarding promises before they complete? When you have a
>>>>> kj::Promise<T> representing an asynchronous task which hasn't completed
>>>>> yet, and you allow that promise to go out-of-scope (without calling 
>>>>> .then()
>>>>> or anything else on it), the asynchronous task will be canceled. To 
>>>>> prevent
>>>>> this, create a kj::TaskSet and add promises to the set -- the TaskSet
>>>>> ensures that the task runs to completion (unless you destroy the TaskSet,
>>>>> of course). For example:
>>>>>
>>>>>     void doThings(kj::TaskSet& tasks) {
>>>>>       kj::Promise<void> task1 = doSomething().then(...).then(...);
>>>>>       kj::Promise<void> task2 = doSomething().then(...).then(...);
>>>>>
>>>>>       tasks.add(kj::mv(task1));
>>>>>     }
>>>>>
>>>>> In the code above, only task1's .then()s will ever execute -- task2
>>>>> will be canceled when the function returns.
>>>>>
>>>>> - Another reason why a .then() continuation might not execute is if an
>>>>> exception was thrown. You can catch exceptions by providing a second
>>>>> callback to .then() which takes kj::Exception as the parameter. Note that
>>>>> when creating a kj::TaskSet, you will be forced to provide an error
>>>>> callback which will be used whenever a task throws an exception -- I
>>>>> usually do KJ_LOG(ERROR, exception) inside that callback.
>>>>>
>>>>> -Kenton
>>>>>
>>>>> On Thu, Aug 25, 2016 at 3:14 AM, <[email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I'm trying to develop an application that replicates remote objects
>>>>>> locally and keeps them updated.  This effectively boils down to a
>>>>>> publication-subscription model where the local proxy object sends an
>>>>>> interface (capability in capnp speak?) to the server that can be used to
>>>>>> send it notifications.  The server stores this interface pointer and
>>>>>> responds with an "okay", then later uses the interface to send the local
>>>>>> object notifications if the server-side object changes.
>>>>>> It doesn't work however.  If I use the interface pointer after the
>>>>>> original "subscription" call has resolved, the message is never received 
>>>>>> on
>>>>>> the client.  If I use the interface _during_ the subscription request the
>>>>>> message is received by the client, but the "then(...)" block on the 
>>>>>> server
>>>>>> is not executed.  I've looked through the calculator example, but that 
>>>>>> uses
>>>>>> the client-side interface immediately.
>>>>>>
>>>>>> - Can the interface pointers (capabilities right?) given in one call
>>>>>> be stored for later use?  I assume this must be the case but I can't be
>>>>>> doing it correctly.
>>>>>> - I'm using the ez-rpc interfaces.  Is this one of the use cases not
>>>>>> supported by them?
>>>>>> - As this is my most basic test case, both the client and server are
>>>>>> in the same compilation unit.  I don't do any thread manipulation myself 
>>>>>> so
>>>>>> I assume they're on the same event loop.  Is this an issue?  If so, I 
>>>>>> want
>>>>>> the single process use case for testing, so is starting one or the other 
>>>>>> on
>>>>>> a different thread sufficient?
>>>>>>
>>>>>> I can reduce the code to a simple example, but before going to the
>>>>>> effort I wanted to make sure I'm not misunderstanding something.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Mark.
>>>>>>
>>>>>> Cap'n Proto version 0.5.3
>>>>>>
>>>>>> --
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Cap'n Proto" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to [email protected].
>>>>>> Visit this group at https://groups.google.com/group/capnproto.
>>>>>>
>>>>>
>>>>>
>>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Cap'n Proto" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> Visit this group at https://groups.google.com/group/capnproto.
>>>
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "Cap'n Proto" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> Visit this group at https://groups.google.com/group/capnproto.
>>
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Cap'n Proto" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> Visit this group at https://groups.google.com/group/capnproto.
>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
Visit this group at https://groups.google.com/group/capnproto.

Reply via email to