Hi,
Thanks a lot for pointers, that saved me a lot of work.  In the end I
managed it with the (high level) AsyncIoProvider so I should get
portability for free if wider system support is added later.  In case
anybody googles this later a rough outline of what I did is below.


(slightly from memory)

kj::Own<AsyncInputStream> threadContactStream; // to send instructions to
capnpThread
std::deque<std::function<void()> queue;  // Queue of commands (RPC requests
or whatever) to run on capnpThread
std::mutex queueMutex;
MyRPC::Client rpcInterface;  // the remote capability, not actually used in
this example

std::thread capnpThread=std::thread( [&]() {
capnp::EzRpcClient client(URI);
rpcInterface=client.getMain<MyRPC>();

// Get a pipe so that the main thread can tell this thread when to wake up
auto myPipe=client.getIoProvider().newOneWayPipe();
threadContactStream=std::move(myPipe.out);

// Need a std::condition_variable::notify_all (or equivalent) here to tell
the
// main thread everything has been constructed and it's safe to continue.

char buffer=1; // Anything other than 0 to get started
while( buffer!=0 )
{
// Wait for a single byte to be sent on the pipe
myPipe.in->read( &buffer, 1 ).wait( client.getWaitScope() );

// Move all current requests out of the shared deque, so that
// a lock is not kept while the functions are running.
std::deque<std::function<void()>> copiedRequests;
{ // Block to limit the lock
std::lock_guard<std::mutex> lock(queueMutex);
copiedRequests.swap(queue);
}
for( auto& function : copiedRequests )
{
if( function ) function();
}
}

rpcInterface=nullptr;  // This is pretty important, seems to have issues
otherwise
});

// Need a std::condition_variable::wait (or equivalent) here to make sure
the other
// thread has time to start and initialise shared objects.

// Push some commands to the other thread
{ // block to limit lock's scope
std::lock_guard<std::mutex> lock(queueMutex);
queue.push_back( [](){ std::cout << "This runs on the client's EventLoop
thread!" << std::endl; } );
}
// tell the capnp thread to wake up and check the queue
const char wakeSignal=1;  // Anything other than 0
threadContactStream->write( &wakeSignal, 1 );

<do other stuff>

// Tell the capnp thread to quit, then wait for it.
const char quitSignal=0;
threadContactStream->write( &quitSignal, 1 );
capnpThread.join();


At the risk of stating the blindingly obvious, if you're testing on a
single machine the server event loop doesn't execute with the client
"wait()", which probably means starting the server on yet another thread
and giving it a NEVER_DONE.  Not that I got stuck on that point for half a
day with a non responsive server or anything...


On 8 September 2016 at 23:44, Kenton Varda <[email protected]> wrote:

> Hi Mark,
>
> Usually, the best way to use Cap'n Proto is in a program that is 100%
> single-threaded and event-driven. It's possible to integrate the KJ event
> loop with other event loop frameworks such that they can cooperate within
> the same thread -- see, for example, node-capnp, which integrates with
> libuv (the event loop used by node.js).
>
> However, if you have no knowledge of the event loop used by the
> application that calls your library, then you will indeed probably need to
> use a separate thread. Probably what you will need to do here is create a
> pipe, where the main thread holds the write end of the pipe, and the capnp
> thread holds the read end. In the capnp thread, use LowLevelAsyncIoProvider
> to wrap the pipe FD so that you can listen on it asynchronously. Whenever
> an RPC is initiated from the main thread, add it to a mutex-protected
> queue, then write a byte to the pipe to wake up the capnp thread. The capnp
> thread should then consume requests from the queue.
>
> I do plan to add a nicer API for cross-thread events eventually, but it's
> one of the long list of things that we haven't yet needed for Sandstorm and
> so it hasn't been prioritized.
>
> -Kenton
>
> On Thu, Sep 8, 2016 at 2:27 PM, Mark Grimes <[email protected]> wrote:
>
>> Arghh, forgot reply all for the group.  Message below...
>>
>> ---------- Forwarded message ----------
>> From: Mark Grimes <[email protected]>
>> Date: 8 September 2016 at 22:24
>> Subject: Re: [capnproto] Storing interfaces for later use
>> To: Kenton Varda <[email protected]>
>>
>>
>> Hi,
>> Coming back to this now that I'm ready for higher level tests.
>>
>> I'm a bit confused by newPromiseAndFulfiller.  Trying myself and also
>> reading a few posts in this group, it's not possible to fulfil a promise
>> from a different thread.  How is it possible to fulfil a promise if the
>> thread that has to fulfil it is blocked in a wait()?  I guess it's in the
>> event loop and not technically blocked, but as far as my code is concerned
>> it's as good as blocked.  So I'm a bit confused by what it's use is outside
>> the EventLoop internals or "then()" blocks.
>>
>> What I currently have is an override of EventPort which is thread synched
>> and takes requests to create (or fulfil) promises on the EventLoop thread.
>> This seems to work fine for basic promises.  Coming to use RPC however, it
>> looks like I'm going to have to implement everything the standard EventPort
>> does, as well as carbon copies of every class from 
>> LowLevelAsyncIoProviderImpl
>> up where the only difference is that it uses my EventPort instead of
>> UnixEventPort.  This seems so unnecessarily complex I must be doing
>> something wrong.  I'm happy to do this but wanted to make sure I hadn't
>> missed something simpler.
>>
>> To describe my original problem, in case I've gone so far down the wrong
>> road the original issue is lost:
>> I need to create an API which gives results from RPC calls as
>> std::futures (so far easy), but the server and client will be periodically
>> replicating data in the background without input from my API's user.  This
>> implies I need the Captain Proto client (Ez or otherwise) running on
>> another thread, but also able to take RPC requests from the main thread.
>> What's the best way of doing this?
>>
>> Thanks,
>>
>> Mark.
>>
>> On 26 August 2016 at 23:22, Kenton Varda <[email protected]> wrote:
>>
>>> Hi Mark,
>>>
>>> Cap'n Proto is single-threaded. The event loop happens on the current
>>> thread.
>>>
>>> The event loop will run whenever you call .wait() on a promise. What you
>>> need to do is create a promise that completes when your test is "done",
>>> then wait on that.
>>>
>>> One way to do this might be to use a promise/fulfiller pair. See
>>> kj::newPromiseAndFulfiller<>(). This returns a pair of a promise and a
>>> fulfiller, where the fulfiller can be called later in order to provide the
>>> result which will then be returned by the promise. So, in your main
>>> function, you would call .wait() on the promise, and then in some other
>>> code, when you determine that it's time to exit, you would call
>>> fulfiller->fulfill().
>>>
>>> If you don't have any particular way to detect "doneness" and instead
>>> want to wait for some amount of time, you can use the timer:
>>>
>>>     ezRpcClient.getIoProvider.getTimer().afterDelay(1 *
>>> kj::SECONDS).wait();
>>>
>>> However, using time delays like this in tests leads to slow, flaky
>>> tests, so I don't recommend it.
>>>
>>> -Kenton
>>>
>>> On Fri, Aug 26, 2016 at 10:50 AM, Mark Grimes <[email protected]>
>>> wrote:
>>>
>>>> Hi,
>>>> As a temporary workaround I'm just making a request I don't need on the
>>>> main capability, after all the calls that I actually want.  I then wait on
>>>> that promise to clear the queue.  The problem is that the replication (i.e.
>>>> publication) happens in the background so I can't wait on those promises,
>>>> and the event loop needs to be entered for the handlers to run.
>>>> That's good enough for now.  I'm not sure how to handle the event loop
>>>> in the client once I get to production code, but I'll cross that bridge
>>>> when I come to it.
>>>>
>>>> Thanks for the pointers.
>>>>
>>>>
>>>>
>>>> On 26 August 2016 at 01:25, Mark Grimes <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>> Thanks for taking the time to respond.  I was doing all of those
>>>>> things, simple communication works fine but it breaks when trying to pass 
>>>>> a
>>>>> client capability to the server for it to use in the "publish" phase.
>>>>> I eventually figured out if I add in a
>>>>>
>>>>> kj::NEVER_DONE.wait( client.getWaitScope() );
>>>>>
>>>>> at the end everything works, but locks at the end of the test fixture
>>>>> (as you would expect).  I think I fundamentally misunderstood where the
>>>>> event loop processing is done. I thought this was performed on a separate
>>>>> thread, so a simple "this_thread::sleap_for(...)" would allow all the
>>>>> handlers in the queue to finish (not the most optimal I know but I wanted
>>>>> to make sure everything worked first).  I now know (think?) there must be 
>>>>> a
>>>>> "wait()" _somewhere_ to enter the event loop again on the current thread
>>>>> and clear the queue.
>>>>>
>>>>> Since I'm using this in test fixtures, NEVER_DONE is not an option
>>>>> because I want to return control and carry on with other tests.
>>>>>
>>>>> - Is there a way with the ez-rpc interfaces to process everything
>>>>> currently in the queue and then return control?  I see EventLoop has a
>>>>> "run()" method but I don't see a way to get the EventLoop from
>>>>> the EzRpcServer/Client.
>>>>> - If not, is there something like "kj::DO_UNTIL.wait( seconds(1),
>>>>> client.getWaitScope() )"; or a std::condition_variable style notification
>>>>> to stop?
>>>>> - With both client and server on the same event loop, there's no
>>>>> difference between "client.getWaitScope()" and "server.getWaitScope()"
>>>>> right?
>>>>>
>>>>> It does seem to be a great library by the way, the bits I've managed
>>>>> to get working.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Mark.
>>>>>
>>>>> On 25 August 2016 at 18:38, Kenton Varda <[email protected]> wrote:
>>>>>
>>>>>> Hi Mark,
>>>>>>
>>>>>> Terminology note: Think of "capability" as meaning the same thing as
>>>>>> "pointer", except it's a pointer that cannot be forged and without which
>>>>>> you cannot otherwise access the target object. An "interface" describes 
>>>>>> the
>>>>>> methods implemented by an object to which a capability points.
>>>>>>
>>>>>> What you describe should work, but there are a number of ways you
>>>>>> might be getting it wrong. It's hard to say which without seeing your 
>>>>>> code,
>>>>>> but here are some things to check:
>>>>>>
>>>>>> - Are you tearing down the RPC client too early? Once the network
>>>>>> connection closes, all capabilities delivered through it become
>>>>>> disconnected. Make sure you construct the RPC client (e.g. EzRpcClient)
>>>>>> once, not every time you want to make a call.
>>>>>>
>>>>>> - Are you discarding promises before they complete? When you have a
>>>>>> kj::Promise<T> representing an asynchronous task which hasn't completed
>>>>>> yet, and you allow that promise to go out-of-scope (without calling 
>>>>>> .then()
>>>>>> or anything else on it), the asynchronous task will be canceled. To 
>>>>>> prevent
>>>>>> this, create a kj::TaskSet and add promises to the set -- the TaskSet
>>>>>> ensures that the task runs to completion (unless you destroy the TaskSet,
>>>>>> of course). For example:
>>>>>>
>>>>>>     void doThings(kj::TaskSet& tasks) {
>>>>>>       kj::Promise<void> task1 = doSomething().then(...).then(...);
>>>>>>       kj::Promise<void> task2 = doSomething().then(...).then(...);
>>>>>>
>>>>>>       tasks.add(kj::mv(task1));
>>>>>>     }
>>>>>>
>>>>>> In the code above, only task1's .then()s will ever execute -- task2
>>>>>> will be canceled when the function returns.
>>>>>>
>>>>>> - Another reason why a .then() continuation might not execute is if
>>>>>> an exception was thrown. You can catch exceptions by providing a second
>>>>>> callback to .then() which takes kj::Exception as the parameter. Note that
>>>>>> when creating a kj::TaskSet, you will be forced to provide an error
>>>>>> callback which will be used whenever a task throws an exception -- I
>>>>>> usually do KJ_LOG(ERROR, exception) inside that callback.
>>>>>>
>>>>>> -Kenton
>>>>>>
>>>>>> On Thu, Aug 25, 2016 at 3:14 AM, <[email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I'm trying to develop an application that replicates remote objects
>>>>>>> locally and keeps them updated.  This effectively boils down to a
>>>>>>> publication-subscription model where the local proxy object sends an
>>>>>>> interface (capability in capnp speak?) to the server that can be used to
>>>>>>> send it notifications.  The server stores this interface pointer and
>>>>>>> responds with an "okay", then later uses the interface to send the local
>>>>>>> object notifications if the server-side object changes.
>>>>>>> It doesn't work however.  If I use the interface pointer after the
>>>>>>> original "subscription" call has resolved, the message is never 
>>>>>>> received on
>>>>>>> the client.  If I use the interface _during_ the subscription request 
>>>>>>> the
>>>>>>> message is received by the client, but the "then(...)" block on the 
>>>>>>> server
>>>>>>> is not executed.  I've looked through the calculator example, but that 
>>>>>>> uses
>>>>>>> the client-side interface immediately.
>>>>>>>
>>>>>>> - Can the interface pointers (capabilities right?) given in one call
>>>>>>> be stored for later use?  I assume this must be the case but I can't be
>>>>>>> doing it correctly.
>>>>>>> - I'm using the ez-rpc interfaces.  Is this one of the use cases not
>>>>>>> supported by them?
>>>>>>> - As this is my most basic test case, both the client and server are
>>>>>>> in the same compilation unit.  I don't do any thread manipulation 
>>>>>>> myself so
>>>>>>> I assume they're on the same event loop.  Is this an issue?  If so, I 
>>>>>>> want
>>>>>>> the single process use case for testing, so is starting one or the 
>>>>>>> other on
>>>>>>> a different thread sufficient?
>>>>>>>
>>>>>>> I can reduce the code to a simple example, but before going to the
>>>>>>> effort I wanted to make sure I'm not misunderstanding something.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Mark.
>>>>>>>
>>>>>>> Cap'n Proto version 0.5.3
>>>>>>>
>>>>>>> --
>>>>>>> You received this message because you are subscribed to the Google
>>>>>>> Groups "Cap'n Proto" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>> send an email to [email protected].
>>>>>>> Visit this group at https://groups.google.com/group/capnproto.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Cap'n Proto" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> Visit this group at https://groups.google.com/group/capnproto.
>>>>
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "Cap'n Proto" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> Visit this group at https://groups.google.com/group/capnproto.
>>>
>>
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "Cap'n Proto" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> Visit this group at https://groups.google.com/group/capnproto.
>>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Cap'n Proto" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> Visit this group at https://groups.google.com/group/capnproto.
>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
Visit this group at https://groups.google.com/group/capnproto.

Reply via email to