James,
On 11/12/17 00:47, James Roper wrote:
> Hi Chris,
>
> This looks like a straight forward way to solve the problem with minimal
> disruption from the existing API. Can I make a few suggestions though?
Of course, your input here is much appreciated.
> We could add a contentLength parameter to fromPublisher, to allow
> Flow.Publishers where the content length is known to be easily converted
> to BodyPublisher:
>
> static BodyPublisher fromPublisher(Flow.Publisher publisher,
> int contentLength) {
> ...
> }
Good idea. Added ( as can be seen below ).
> This would mean if you were receiving a servlet request body and
> publishing it to another location, then you could do something like this
> (this uses a reactive streams implementation on top of the servlet API
> that I wrote):
>
> HttpServletRequest request = ...
> long contentLength = -1;
> if (request.getHeader("Content-Length") != null) {
>contentLength = Long.parseLong(request.getHeader("Content-Length"));
> }
> Publisher publisher = new
> RequestPublisher(request.startAsync(), 8192);
>
> HttpRequest clientRequest = HttpRequest.newBuilder(target)
>.POST(BodyPublisher.fromPublisher(publisher, contentLength))
>.build()
Nice.
> Perhaps the method could be overloaded for both supplying and not
> supplying a content length.
I think an overload is justified here. Added.
> Similarly, I think a fromSubscriber API that accepted a
> CompletionStage would be a little more fluent than having to supply
> it externally:
Daniel and I discussed this too, but I opted to leave it out initially
for simplicity. I think if we have two overloads, then the simple case
can still be supported with little ceremony, while allowing a more
powerful variant.
> public static BodyHandler fromSubscriber(Subscriber List> subscriber, CompletionStage bodyFuture) {
>...
> }
>
> Then you could have something like this:
>
> TextSubscriber subscriber = ...; // accumulates bytes and transforms
> them into a CompletionStage.
> CompletionStage result = subscriber.getTextResult();
>
> CompletableFuture cf = client
>.sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
> String text = cf.join();
>
> Likewise, this could be an overload of fromSubscriber if we want the
> option of not specifying a body future.
>
> One thing I think needs to be carefully specified is, if the method
> doesn't accept a CompletionStage, when/how the CompletionStage returned
> from send is redeemed.
Hmmm... this could be tricky. I think we can avoid the scenario
completely by accepting a finishing function that can generate/return
the value to use for completion, rather than the CF itself.
Here is an outline of all of the above:
public interface BodyPublisher extends Flow.Publisher {
/**
* Returns a request body publisher whose body is retrieved from the
* given {@code Flow.Publisher}. The returned request body publisher
* has an unknown content length.
*
* @apiNote This method can be used as an adapter between {@code
* BodyPublisher} and {@code Flow.Publisher}, where the amount of
* request body that the publisher will publish is unknown.
*
* @param publisher the publisher responsible for publishing the body
* @return a BodyPublisher
*/
static BodyPublisher fromPublisher(Flow.PublisherByteBuffer> publisher) { ... }
/**
* Returns a request body publisher whose body is retrieved from the
* given {@code Flow.Publisher}. The returned request body publisher
* has the given content length.
*
* The given {@code contentLength} is a positive number, that
* represents the exact amount of bytes the {@code publisher} must
* publish.
*
* @apiNote This method can be used as an adapter between {@code
* BodyPublisher} and {@code Flow.Publisher}, where the amount of
* request body that the publisher will publish is known.
*
* @param publisher the publisher responsible for publishing the body
* @param contentLength a positive number representing the exact
* amount of bytes the publisher will publish
* @throws IllegalArgumentException if the content length is
* non-positive
* @return a BodyPublisher
*/
static BodyPublisher fromPublisher(Flow.PublisherByteBuffer> publisher,
long contentLength) { ... }
public interface BodyHandler {
/**
* Returns a response body handler that returns a {@link BodySubscriber
* BodySubscriber}{@code } obtained from {@linkplain
* BodySubscriber#fromSubscriber(Subscriber)}, with the given
* {@code subscriber}.
*
* The response body is not available through this, or the {@code
* HttpResponse} API, but instead all response body is forwarded to the
* given {@code subscriber}, which should make it available, if
* appropriate, through some o