The paf.promise.fork I think was copy-paste error trying to follow what
TwoPartyServer is doing.

So here's something interesting. When I close one of the remote peers to KJ
(remote peer for stream2 in this example), none of the error handlers even
run. If I joinPromises then on Windows the shutdown is never processed. On
Android there's a kj::Disconnected exception raised which is a weird
platform difference (either the Windows backend has a bug or it's just an
inevitable OS difference that can't be abstracted properly).

Here's the KJ loop:

kj::Canceler stillRunning;

auto stream1 = ioContext.lowLevelProvider->wrapSocketFd(
            rawSocket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);

try {
stillRunning.wrap(listener->accept().then([&](kj::Own<kj::AsyncIoStream>&&
stream2) mutable {
  kj::Vector<kj::Promise<void>> pumped;
  pumped.add(stream1->pumpTo(stream2)
    .ignoreResult()
    .then(
      [] { std::cerr << "stream1 finished pumping" << std::endl; },
      [] (kj::Exception&& e) { std::cerr << "stream1 pumping error" <<
std::endl; }));
  pumped.add(stream2->pumpTo(stream1)
    .ignoreResult()
    .then(
      [] { std::cerr << "stream2 finished pumping" << std::endl; },
      [] (kj::Exception&& e) { std::cerr << "stream2 pumping error" <<
std::endl; }));
  return kj::joinPromises(pumped.releaseAsArray())
    .attach(kj::mv(stream1), kj::mv(stream2));
})).wait(waitScope);
} catch (const kj::Exception& e) {
  std::cerr << "Bridge failed: " << kj::str(e) << std::endl;
}

My test code (which runs on the main thread whereas the kj code runs in its
own dedicated thread) connects 2 blocking sockets manually using regular
blocking BSD sockets to each socket (i.e. the peers for stream1 and
stream2). I then send 1 MB via the other endpoint of stream2. Then I
shutdown that socket with SD_BOTH. Then I recv the 1 MB with MSG_WAITALL on
the other side of stream1. That seems to work OK. Then I try to recv a
second time on the expectation that one of the pumps would have detected
some problem somewhere but I run into the situation outlined initially.

Am I doing something wrong/have incorrect expectations on the error
handlers for the pump?

I haven't yet tried this code on stock Linux/macos.

Appreciate any insights you may have! I'm thinking in the short term I'll
just use `exclusiveJoin` because I'm thinking there's no world where any of
the handlers for pumping between two network sockets would ever end up
getting called.

On Mon, Jul 13, 2020 at 5:01 PM Kenton Varda <[email protected]> wrote:

> Hmm I don't understand what you're doing with paf.promise.fork(), it looks
> like you're merely attaching it to another promise, not actually waiting on
> it or anything.
>
> I am a little concerned about your use of shutdownWrite() / abortRead() in
> a way that assumes these functions will cause any concurrent write or read
> to throw an exception. That actually isn't quite the intent here. These
> methods are intended to inform the *other side* of the connection that the
> connection has ended; the weren't designed to cancel operations on your own
> end. I think that the way they are implemented for native streams today
> might produce the behavior you want mostly by accident.
>
> The "proper" thing to do would be to cancel any concurrent operation first
> by destroying the associated promise. So before calling abortRead(), make
> sure to destroy any promises that represent a read() operation.
>
> Unfortunately, this brings us back to the problem with joinPromises().
> What you really want is for joinPromises() to fail fast if either branch
> fails, but otherwise wait for both to finish.
>
> I guess one rather-ugly way you could do that today is to create a
> PromiseAndFulfiller for cancellation purposes. Use
> joinPromises().exclusiveJoin(paf.promise). Then reject the fulfiller if you
> catch an exception on either side. But that's pretty ugly.
>
> We need a joinFailfast(), I think...
>
> -Kenton
>
> On Mon, Jul 6, 2020 at 10:13 AM Vitali Lovich <[email protected]> wrote:
>
>>
>>
>> On Mon, Jul 6, 2020 at 7:41 AM Vitali Lovich <[email protected]> wrote:
>>
>>>
>>> On Mon, Jul 6, 2020 at 7:26 AM Kenton Varda <[email protected]>
>>> wrote:
>>>
>>>> Almost. Two issues I can think of:
>>>>
>>>> 1. After each pump completes, you probably want to call shutdownWrite()
>>>> to propagate the EOF.
>>>> 2. `joinPromises()` always waits for all promises to resolve. I've
>>>> often found that this turns out to be the wrong behavior when one of the
>>>> joined promises throws an exception. Usually you want the other tasks
>>>> canceled in that case. I think that might be the case here -- if you get an
>>>> I/O error in one direction, you probably want to kill the whole stream.
>>>> Then again, probably that'll happen anyway in most cases. (Whereas, EOF is
>>>> not an error, so you do want to wait for the other promise in that case.)
>>>>
>>>
>>> So more like this?
>>>
>>> return stream1.pumpTo(stream2).ignoreResult().then(
>>>   [&] {stream2.shutdownWrite()},
>>>   [&](kj::Exception&& e){
>>>     stream1.shutdownWrite();
>>>     stream1.abortRead();
>>>     stream2.shutdownWrite();
>>>     stream2.abortRead();
>>>   })).exclusiveJoin(
>>>     stream2.pumpTo(stream1).ignoreResult().then(
>>>       [&] {stream1.shutdownWrite()},
>>>       [&](kj::Exception&& e){
>>>         stream1.shutdownWrite();
>>>         stream1.abortRead();
>>>         stream2.shutdownWrite();
>>>         stream2.abortRead();
>>>       }));
>>>
>> Actually, I think this is two hand-wavy. Also I think the original
>> inclusive join is actually correct because I want to ensure that both sides
>> finish any I/O that may be in flight. Otherwise I may end the stream
>> prematurely just because 1 end finished (e.g. 1 end sends some data & then
>> closes because its done - the peer won't receive all the data).
>>
>> My current code looks something like:
>>
>> void completelyClose(kj::AsyncIoStream& stream) {
>>     stream.shutdownWrite();
>>     stream.abortRead();
>> };
>>
>> kj::Canceler stillRunning;
>> auto stream1 = ioContext.lowLevelProvider->wrapSocketFd(
>>             rawSocket, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
>>
>> stillRunning.wrap(listener->accept().then([&](kj::Own<kj::AsyncIoStream>&&
>> stream2) mutable {
>>   auto paf = kj::newPromiseAndFulfiller<void>();
>>   auto unsafeStream2 = stream2.get();
>>
>>   kj::Vector<kj::Promise<void>> pumped;
>>   pumped.add(stream1->pumpTo(stream2)
>>     .ignoreResult()
>>     .then(
>>       [stream2 = stream2.get()] { stream2->shutdownWrite(); },
>>       [&stream1, stream2 = stream2.get()] (kj::Exception&& e) {
>>         completelyClose(*stream1);
>>         completelyClose(*stream2);
>>       });
>>   pumped.add(unsafeStream2->pumpTo(stream1)
>>     .ignoreResult()
>>     .then(
>>       [&stream1] { stream1->shutdownWrite(); },
>>       [&stream1, stream2 = unsafeStream2] (kj::Exception&& e) {
>>         completelyClose(*stream1);
>>               completelyClose(*stream2);
>>       }));
>>   return kj::joinPromises(pumped.releaseAsArray())
>>     .attach(
>>       paf.promise.fork(),
>>       // AcceptedConnection simply fulfills on destruction.
>>       kj::heap<AcceptedConnection>(kj::mv(stream2),
>> kj::mv(paf.fulfiller)),
>>     );
>> })).wait(waitScope);
>>
>> The fulfiller stuff is another place I'm pretty sure I haven't done
>> right. I was just going off of what's happening under the hood when you
>> wait on the promise that TwoPartyServer returns when it listens.
>>
>>
>>>
>>>> On another note, a warning: I'm likely to change the AsyncOutputStream
>>>> interface significantly in the future, in order to replace
>>>> `shutdownWrite()` with something that returns a promise, and to make it so
>>>> that if you don't explicitly shut down a stream, then it's treated as an
>>>> error. Currently, AsyncOutputStream's destructor implicitly sends a clean
>>>> EOF, but that's the wrong thing to do when the sender terminated
>>>> prematurely due to an exception. So, your code will need some updating when
>>>> that happens.
>>>>
>>>
>>> Yeah, as long as it's a strict compilation error if there's changes that
>>> need to be made to make previously correct work correct again, that's fine
>>> (+ maybe documentation in the release notes on how to migrate). Can just
>>> treat that as the cost of updating to a new version of the library. More
>>> challenging would be any implicit behavioural changes that don't change the
>>> API. Those can be harder to catch even with unit tests.
>>>
>>> Also while you're at it, does it make sense to have 1 function that does
>>> both shutdownWrite/abortRead so that if both are returning futures, joining
>>> them correctly can be hidden as an implementation detail? I'm finding the
>>> promises stuff to be straightforward for simple cases but anything more
>>> custom is harder to reason about in terms of making sure they compose
>>> correctly - I never feel quite comfortable if I've written the promise code
>>> correctly (especially with all the different types of promises, Canceler &
>>> TaskSet). Hopefully it's easy to integrate C++20 coroutine support to make
>>> things read more linearly again. Not sure when I'll get to use C++20 though.
>>>
>>>
>>>>
>>>> -Kenton
>>>>
>>>> On Sun, Jul 5, 2020 at 8:13 PM Vitali Lovich <[email protected]> wrote:
>>>>
>>>>> I was wondering what would be the best way to bridge 2
>>>>> kj::AsyncIoStreams to each other (read to write/write to read) so that 
>>>>> they
>>>>> act as pass-through? I'm assuming something like:
>>>>>
>>>>> auto pumped = kj::ArrayBuilder<kj::Promise<void>>(2);
>>>>> pumped.add(stream1.pumpTo(stream2).ignoreResult());
>>>>> pumped.add(stream2.pumpTo(stream1).ignoreResult());
>>>>> return kj::joinPromises(pumped.finish()).ignoreResult();
>>>>>
>>>>> Thanks,
>>>>> Vitali
>>>>>
>>>>> --
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Cap'n Proto" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> To view this discussion on the web visit
>>>>> https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com
>>>>> <https://groups.google.com/d/msgid/capnproto/CAF8PYMh%3DKr9Yzmz9on4Cxprb0irNOGpV0MUtBxdGitbOgkjiEg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>>>> .
>>>>>
>>>>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/CAF8PYMj6FFnWv4W8RYaggszipw2RB55GmWqJ-WBJofWPqw45_g%40mail.gmail.com.

Reply via email to