I filed the following JIRA issue to track this discussion
and proposal.

  https://bugs.openjdk.java.net/browse/JDK-8193365

I'll start the process of bringing it into 10, unless there
are any final comments. FTR, I'm happy where we ended up on
this.

-Chris.

On 11/12/17 15:48, Chris Hegarty wrote:
James,

On 11/12/17 00:47, James Roper wrote:
 > Hi Chris,
 >
 > This looks like a straight forward way to solve the problem with minimal
 > disruption from the existing API. Can I make a few suggestions though?

Of course, your input here is much appreciated.

 > We could add a contentLength parameter to fromPublisher, to allow
 > Flow.Publishers where the content length is known to be easily converted
 > to BodyPublisher:
 >
 > static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> publisher,
 > int contentLength) {
 >      ...
 > }

Good idea. Added ( as can be seen below ).

 > This would mean if you were receiving a servlet request body and
 > publishing it to another location, then you could do something like this
 > (this uses a reactive streams implementation on top of the servlet API
 > that I wrote):
 >
 > HttpServletRequest request = ...
 > long contentLength = -1;
 > if (request.getHeader("Content-Length") != null) {
 >    contentLength = Long.parseLong(request.getHeader("Content-Length"));
 > }
 > Publisher<ByteBuffer> publisher = new
 > RequestPublisher(request.startAsync(), 8192);
 >
 > HttpRequest clientRequest = HttpRequest.newBuilder(target)
 >    .POST(BodyPublisher.fromPublisher(publisher, contentLength))
 >    .build()

Nice.

 > Perhaps the method could be overloaded for both supplying and not
 > supplying a content length.

I think an overload is justified here. Added.

 > Similarly, I think a fromSubscriber API that accepted a
 > CompletionStage<T> would be a little more fluent than having to supply
 > it externally:

Daniel and I discussed this too, but I opted to leave it out initially
for simplicity. I think if we have two overloads, then the simple case
can still be supported with little ceremony, while allowing a more
powerful variant.

 > public static <T> BodyHandler<T> fromSubscriber(Subscriber<? super
 > List<ByteBuffer>> subscriber, CompletionStage<T> bodyFuture) {
 >    ...
 > }
 >
 > Then you could have something like this:
 >
 > TextSubscriber subscriber = ...;  // accumulates bytes and transforms
 > them into a CompletionStage<String>.
 > CompletionStage<String> result = subscriber.getTextResult();
 >
 > CompletableFuture<String> cf =  client
 >    .sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
 > String text = cf.join();
 >
 > Likewise, this could be an overload of fromSubscriber if we want the
 > option of not specifying a body future.
 >
 > One thing I think needs to be carefully specified is, if the method
 > doesn't accept a CompletionStage, when/how the CompletionStage returned
 > from send is redeemed.

Hmmm... this could be tricky. I think we can avoid the scenario
completely by accepting a finishing function that can generate/return
the value to use for completion, rather than the CF itself.

Here is an outline of all of the above:

   public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {

     /**
      * Returns a request body publisher whose body is retrieved from the
      * given {@code Flow.Publisher}. The returned request body publisher
      * has an unknown content length.
      *
      * @apiNote This method can be used as an adapter between {@code
      * BodyPublisher} and {@code Flow.Publisher}, where the amount of
      * request body that the publisher will publish is unknown.
      *
      * @param publisher the publisher responsible for publishing the body
      * @return a BodyPublisher
      */
static BodyPublisher fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher) { ... }

     /**
      * Returns a request body publisher whose body is retrieved from the
      * given {@code Flow.Publisher}. The returned request body publisher
      * has the given content length.
      *
      * <p> The given {@code contentLength} is a positive number, that
      * represents the exact amount of bytes the {@code publisher} must
      * publish.
      *
      * @apiNote This method can be used as an adapter between {@code
      * BodyPublisher} and {@code Flow.Publisher}, where the amount of
      * request body that the publisher will publish is known.
      *
      * @param publisher the publisher responsible for publishing the body
      * @param contentLength a positive number representing the exact
      *                      amount of bytes the publisher will publish
      * @throws IllegalArgumentException if the content length is
      *                                  non-positive
      * @return a BodyPublisher
      */
static BodyPublisher fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher,
                                        long contentLength) { ... }


   public interface BodyHandler<T> {

     /**
* Returns a response body handler that returns a {@link BodySubscriber
      * BodySubscriber}{@code <Void>} obtained from {@linkplain
      * BodySubscriber#fromSubscriber(Subscriber)}, with the given
      * {@code subscriber}.
      *
      * <p> The response body is not available through this, or the {@code
* HttpResponse} API, but instead all response body is forwarded to the
      * given {@code subscriber}, which should make it available, if
      * appropriate, through some other mechanism, e.g. an entry in a
      * database, etc.
      *
      * @apiNote This method can be used as an adapter between {@code
      * BodySubscriber} and {@code Flow.Subscriber}.
      *
      * <p> For example:
      * <pre> {@code
      *  TextSubscriber subscriber = new TextSubscriber();
      *  HttpResponse<Void> response = client.sendAsync(request,
      *      BodyHandler.fromSubscriber(subscriber)).join();
      *  System.out.println(response.statusCode());
      * }</pre>
      *
      * @param subscriber the subscriber
      * @return a response body handler
      */
     public static BodyHandler<Void>
fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) { ... }

     /**
* Returns a response body handler that returns a {@link BodySubscriber
      * BodySubscriber}{@code <S,T>} obtained from {@link
      * BodySubscriber#fromSubscriber(Subscriber, Function)}, with the
      * given {@code subscriber} and {@code finisher} function.
      *
      * <p> The given {@code finisher} function is applied after the given
* subscriber's {@code onComplete} has been invoked. The {@code finisher}
      * function is invoked with the given subscriber, and returns a value
      * that is set as the response's body.
      *
      * @apiNote This method can be used as an adapter between {@code
      * BodySubscriber} and {@code Flow.Subscriber}.
      *
      * <p> For example:
      * <pre> {@code
* TextSubscriber subscriber = ...; // accumulates bytes and transforms them into a String.*
      * HttpResponse<String> response = client.sendAsync(request,
* BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join();
      * String text = response.body();
      * }</pre>
      *
      * @param <S> the type of the Subscriber
      * @param <T> the type of the response body
      * @param subscriber the subscriber
* @param finisher a function to be applied after the subscriber has completed
      * @return a response body handler
      */
public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodyHandler<T>
     fromSubscriber(S subscriber, Function<S,T> finisher) { ... }


   // And a similar pair of BodySubscriber methods, omitted for brevity.


-Chris.

Reply via email to