Hi Pavel, Sorry for taking a while to respond, I've been travelling and wanted to take the time to properly understand the concerns you've raised.
The reasoning you've stated for the current API design is all sound, and obviously the choice to go with that design and not others depends on the goals of the API. I don't think I'm in a position to be able to disagree with the goals that you've set, I'm sure you've done a lot of research, talked to a lot of people and got a lot of feedback. So I'm happy to accept that the current API as it stands is a good one and should stay as is. The question for me then is whether there is value in introducing an additional higher level API (ie, a reactive streams based API) for the purposes of integration with other libraries that provide RS support. Introducing another API, regardless of what that API is, obviously has drawbacks as it increases the surface area of the API, and thus complexity, and also potentially introduces confusion for application developers as they now have two different ways of achieving things, and they need to know which to use when. So, the value of that API must be high enough to outweigh those drawbacks. I strongly believe that a Reactive Streams based API does add enough value to do that - and the post that I shared in my previous email demonstrates how an ecosystem where reactive streams is embraced can provide a lot of value to application developers. So I'm happy to talk more here about use cases and code samples etc if you don't agree here. To address the issues you raised - I think when providing reactive streams integration, we need to accept that it is a high level API, and there can be costs associated with it - performance costs, in some cases functionality costs, etc. This is the cost of abstracting a problem away so that many different libraries can reliably integrate with each other - there are many libraries that work with reusable buffers, but there is no standard for how the lifecycle of those buffers are managed, in some cases they get reused immediately, in other cases, for example, in Netty, they use reference counting, and transparently return the buffer to a pool when the reference count reaches zero. The Reactive Streams spec specifically decided not to deal with this side of integrating streaming libraries, which means either that some out of band mechanism needs to be provided for signalling the end of use of buffers (which puts constraints on interop because not every RS sink/source that you might plumb in would support integrating with that out of band mechanism), or that reusable buffers cannot be used - in general, the latter would probably be preferred, as high level concurrency integration code generally requires messages sent between different components to be immutable anyway. So, I would say that a reactive streams interface would copy the reusable buffer into a read only immutable buffer, and that this choice of decreased performance would be a trade off given to the application developer when they choose between using the RS API or writing their own code to use the proprietary API. For signalling errors upstream, in all the cases that I've worked with, we've never found a reason that sending the error downstream, and just cancelling upstream, is a problem. At the end of the day, there is only one thing that can fail, the TCP connection, and if it fails, both upstream and downstream will fail - there is no such thing as a half failure in TCP. So the error sent down, and the error sent up, will always represent the same failure of the TCP socket, and probably will always be exactly the same exception. So, if you send errors both up and down, then the application has a problem - which one should it handle? Should it handle both? The end effect of handling both, at a minimum, will usually mean every failure of a WebSocket connection will result in the error appearing twice in an applications logs, once for downstream, once for upstream, which is not helpful because there was only actually one error. At worst, it means for example if you have code to restart a WebSocket connection, without carefully written concurrent code, you could end up double restarting the streams. By only sending errors through the Subscriber interface, ie downstream, you simplify the application handling of errors, because the application only needs to handle the error in one place, rather than having to make a choice between handling it in one place or another, and getting that choice right. The issue you raised with this meaning that this means there will be coupling between the subscriber and publisher sides - while that's correct, it's not the error handling that would cause that. The publisher and subscriber side are always intrinsically linked, because they are bound to the same TCP connection - they live and die based on that TCP connection. If one side fails, so does the other side. If one side starts, so does the otherside. If one side decides to restart, the otherside must restart too. They are already coupled, and cannot be decoupled. Introducing coupling in the error handling simplifies the handling, because it exposes what is already true. The creation of additional objects for representing message types is of course an overhead, but one that users of the high level API would be happy to have given the advantages of not having to implement their own integration code. The JVM is highly optimised to handle the creation and garbage collection of short lived objects, and this allows high level APIs like Reactive Streams to exploit this, allowing simpler programming models without having to forgo a huge amount of performance. Of course, that doesn't mean we needlessly create objects for no reason, but in this case I believe the benefits of being able to seamlessly integrate with other streaming libraries outweigh the overhead of an allocation or two per message. Having one stream per message type would cause problems - for example, the socket.io protocol encodes binary messages by first sending a text message containing metadata, which is then followed by one or more binary messages. Sending this in multiple different streams would make correlation of those binary messages with the text header very difficult if not impossible. Regards, James On 12 February 2018 at 16:33, Pavel Rappo <pavel.ra...@oracle.com> wrote: > Hello James, > > Thanks for the comprehensive reply to Chuck's email. Now regarding your > own email. > > > On 10 Feb 2018, at 07:38, James Roper <ja...@lightbend.com> wrote: > > > > <snip> > > > > But that brings me to a problem that I'd like to give as feedback to the > implementers - this API is not Reactive Streams, and so therefore can't > take advantage of Reactive Streams implementations, and more problematic, > can't interop with other Reactive Streams sinks/sources. If I wanted to > stream a WebSocket into a message broker that supports Reactive Streams, I > can't. I would definitely hope that Reactive Streams support could be added > to this API, at a minimum as a wrapper, so that application developers can > easily focus on their business problems, plumbing and transforming messages > from one place to another, rather than having to deal with implementing > concurrent code to pass messages. > > > > I totally understand your concern over the lack of a Reactive Streams > interface > to this low-level API. All I can say is that it's not that we haven't > tried. As > usual the devil is in the detail. There were at least a couple of major > issues > we couldn't find a satisfactory solution to. > > The first one is how to communicate errors back to the user's code that > publishes messages to WebSocket. This issue has been extensively discussed > here [1]. > The only signal the publisher can receive from a subscriber in the case of > a > failure is a cancellation signal. After this signal the subscription is > considered cancelled. Now, there are different solutions to this problem, > but > none of the ones we looked at seemed comprehensive. For example, such > errors > can be propagated by the WebSocket to the user's subscriber receiving > messages. > In which case WebSocket input and output will become tied and the WebSocket > client will seem to require both the publisher and the subscriber to be > present > at the same time. > > The second issue is how to communicate completion signals from processing > individual messages. That might be handy in the case the implementation or > the > application decide to recycle buffers they use. With a naive RS interface > to > WebSocket all the user's publisher can receive from the WebSocket's > subscriber > is a request for a number of extra messages. Using only this number the > publisher cannot deduce which of the previously published messages have > been > actually sent. This problem also has a number of solutions. One of which > would > be to use more streams. An extra publisher WebSocket would use to emit > outcomes > of sending messages and an extra subscriber WebSocket would use to receive > signals from the user once a message has been received. To be honest it > looks > cumbersome. > > There are also a dozen of smaller issues that have to be resolved before > creating a well-defined RS interface to WebSocket. > > > It may well require wrapping messages in a high level object - text, > binary, ping, pong, etc, to differentiate between the message types. > > > > We went great lengths in supporting different kinds of requirements in > this API. > One of the such requirements that arose from a number of discussions on > this > mailing list was not to force unnecessary allocations, garbage creation and > copying. To this end we utilised completion stages for send/receive > operations [2]. > We abandoned types for messages and used a method-naming scheme instead (I > believe with RS the equivalent would be to provide a stream per message > type). > Even WebSocket.Listener was designed such that one instance could (if > required) > service many WebSocket instances. That's the reason each method in > Listener has > a WebSocket argument. > > > <snip> > > > > https://developer.lightbend.com/blog/2018-02-06-reactive- > streams-ee4j/index.html > > > > Thanks for the link. > > That said, I think once we have satisfactory answers to the questions > above, > we might provide an RS adapter you were talking about to this low-level > API. > > Meanwhile it is possible to built one already, on top of the existing API > even > though it is not the same as RS, semantically they are very similar. > > Thanks, > -Pavel > > --------------------------------------------------- > [1] https://github.com/reactive-streams/reactive-streams-jvm/issues/271 > [2] At one point we were even considering using callbacks similar to > java.nio.channels.CompletionHandler instead of java.util.concurrent. > CompletionStage > for both sending and receiving messages. All this is for the sake of > not > creating extra objects when this is considered expensive. Who knows, we > might come back to retrofit the API and add this capability later. > > -- *James Roper* *Senior Octonaut* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>