Hello James, Thanks for the comprehensive reply to Chuck's email. Now regarding your own email.
> On 10 Feb 2018, at 07:38, James Roper <ja...@lightbend.com> wrote: > > <snip> > > But that brings me to a problem that I'd like to give as feedback to the > implementers - this API is not Reactive Streams, and so therefore can't take > advantage of Reactive Streams implementations, and more problematic, can't > interop with other Reactive Streams sinks/sources. If I wanted to stream a > WebSocket into a message broker that supports Reactive Streams, I can't. I > would definitely hope that Reactive Streams support could be added to this > API, at a minimum as a wrapper, so that application developers can easily > focus on their business problems, plumbing and transforming messages from one > place to another, rather than having to deal with implementing concurrent > code to pass messages. > I totally understand your concern over the lack of a Reactive Streams interface to this low-level API. All I can say is that it's not that we haven't tried. As usual the devil is in the detail. There were at least a couple of major issues we couldn't find a satisfactory solution to. The first one is how to communicate errors back to the user's code that publishes messages to WebSocket. This issue has been extensively discussed here [1]. The only signal the publisher can receive from a subscriber in the case of a failure is a cancellation signal. After this signal the subscription is considered cancelled. Now, there are different solutions to this problem, but none of the ones we looked at seemed comprehensive. For example, such errors can be propagated by the WebSocket to the user's subscriber receiving messages. In which case WebSocket input and output will become tied and the WebSocket client will seem to require both the publisher and the subscriber to be present at the same time. The second issue is how to communicate completion signals from processing individual messages. That might be handy in the case the implementation or the application decide to recycle buffers they use. With a naive RS interface to WebSocket all the user's publisher can receive from the WebSocket's subscriber is a request for a number of extra messages. Using only this number the publisher cannot deduce which of the previously published messages have been actually sent. This problem also has a number of solutions. One of which would be to use more streams. An extra publisher WebSocket would use to emit outcomes of sending messages and an extra subscriber WebSocket would use to receive signals from the user once a message has been received. To be honest it looks cumbersome. There are also a dozen of smaller issues that have to be resolved before creating a well-defined RS interface to WebSocket. > It may well require wrapping messages in a high level object - text, binary, > ping, pong, etc, to differentiate between the message types. > We went great lengths in supporting different kinds of requirements in this API. One of the such requirements that arose from a number of discussions on this mailing list was not to force unnecessary allocations, garbage creation and copying. To this end we utilised completion stages for send/receive operations [2]. We abandoned types for messages and used a method-naming scheme instead (I believe with RS the equivalent would be to provide a stream per message type). Even WebSocket.Listener was designed such that one instance could (if required) service many WebSocket instances. That's the reason each method in Listener has a WebSocket argument. > <snip> > > https://developer.lightbend.com/blog/2018-02-06-reactive-streams-ee4j/index.html > Thanks for the link. That said, I think once we have satisfactory answers to the questions above, we might provide an RS adapter you were talking about to this low-level API. Meanwhile it is possible to built one already, on top of the existing API even though it is not the same as RS, semantically they are very similar. Thanks, -Pavel --------------------------------------------------- [1] https://github.com/reactive-streams/reactive-streams-jvm/issues/271 [2] At one point we were even considering using callbacks similar to java.nio.channels.CompletionHandler instead of java.util.concurrent.CompletionStage for both sending and receiving messages. All this is for the sake of not creating extra objects when this is considered expensive. Who knows, we might come back to retrofit the API and add this capability later.