James,

On 11/12/17 00:47, James Roper wrote:
> Hi Chris,
>
> This looks like a straight forward way to solve the problem with minimal
> disruption from the existing API. Can I make a few suggestions though?

Of course, your input here is much appreciated.

> We could add a contentLength parameter to fromPublisher, to allow
> Flow.Publishers where the content length is known to be easily converted
> to BodyPublisher:
>
> static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> publisher,
> int contentLength) {
>      ...
> }

Good idea. Added ( as can be seen below ).

> This would mean if you were receiving a servlet request body and
> publishing it to another location, then you could do something like this
> (this uses a reactive streams implementation on top of the servlet API
> that I wrote):
>
> HttpServletRequest request = ...
> long contentLength = -1;
> if (request.getHeader("Content-Length") != null) {
>    contentLength = Long.parseLong(request.getHeader("Content-Length"));
> }
> Publisher<ByteBuffer> publisher = new
> RequestPublisher(request.startAsync(), 8192);
>
> HttpRequest clientRequest = HttpRequest.newBuilder(target)
>    .POST(BodyPublisher.fromPublisher(publisher, contentLength))
>    .build()

Nice.

> Perhaps the method could be overloaded for both supplying and not
> supplying a content length.

I think an overload is justified here. Added.

> Similarly, I think a fromSubscriber API that accepted a
> CompletionStage<T> would be a little more fluent than having to supply
> it externally:

Daniel and I discussed this too, but I opted to leave it out initially
for simplicity. I think if we have two overloads, then the simple case
can still be supported with little ceremony, while allowing a more
powerful variant.

> public static <T> BodyHandler<T> fromSubscriber(Subscriber<? super
> List<ByteBuffer>> subscriber, CompletionStage<T> bodyFuture) {
>    ...
> }
>
> Then you could have something like this:
>
> TextSubscriber subscriber = ...;  // accumulates bytes and transforms
> them into a CompletionStage<String>.
> CompletionStage<String> result = subscriber.getTextResult();
>
> CompletableFuture<String> cf =  client
>    .sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
> String text = cf.join();
>
> Likewise, this could be an overload of fromSubscriber if we want the
> option of not specifying a body future.
>
> One thing I think needs to be carefully specified is, if the method
> doesn't accept a CompletionStage, when/how the CompletionStage returned
> from send is redeemed.

Hmmm... this could be tricky. I think we can avoid the scenario
completely by accepting a finishing function that can generate/return
the value to use for completion, rather than the CF itself.

Here is an outline of all of the above:

  public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {

    /**
     * Returns a request body publisher whose body is retrieved from the
     * given {@code Flow.Publisher}. The returned request body publisher
     * has an unknown content length.
     *
     * @apiNote This method can be used as an adapter between {@code
     * BodyPublisher} and {@code Flow.Publisher}, where the amount of
     * request body that the publisher will publish is unknown.
     *
     * @param publisher the publisher responsible for publishing the body
     * @return a BodyPublisher
     */
static BodyPublisher fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher) { ... }

    /**
     * Returns a request body publisher whose body is retrieved from the
     * given {@code Flow.Publisher}. The returned request body publisher
     * has the given content length.
     *
     * <p> The given {@code contentLength} is a positive number, that
     * represents the exact amount of bytes the {@code publisher} must
     * publish.
     *
     * @apiNote This method can be used as an adapter between {@code
     * BodyPublisher} and {@code Flow.Publisher}, where the amount of
     * request body that the publisher will publish is known.
     *
     * @param publisher the publisher responsible for publishing the body
     * @param contentLength a positive number representing the exact
     *                      amount of bytes the publisher will publish
     * @throws IllegalArgumentException if the content length is
     *                                  non-positive
     * @return a BodyPublisher
     */
static BodyPublisher fromPublisher(Flow.Publisher<? extends ByteBuffer> publisher,
                                       long contentLength) { ... }


  public interface BodyHandler<T> {

    /**
     * Returns a response body handler that returns a {@link BodySubscriber
     * BodySubscriber}{@code <Void>} obtained from {@linkplain
     * BodySubscriber#fromSubscriber(Subscriber)}, with the given
     * {@code subscriber}.
     *
     * <p> The response body is not available through this, or the {@code
     * HttpResponse} API, but instead all response body is forwarded to the
     * given {@code subscriber}, which should make it available, if
     * appropriate, through some other mechanism, e.g. an entry in a
     * database, etc.
     *
     * @apiNote This method can be used as an adapter between {@code
     * BodySubscriber} and {@code Flow.Subscriber}.
     *
     * <p> For example:
     * <pre> {@code
     *  TextSubscriber subscriber = new TextSubscriber();
     *  HttpResponse<Void> response = client.sendAsync(request,
     *      BodyHandler.fromSubscriber(subscriber)).join();
     *  System.out.println(response.statusCode());
     * }</pre>
     *
     * @param subscriber the subscriber
     * @return a response body handler
     */
    public static BodyHandler<Void>
    fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) { ... }

    /**
     * Returns a response body handler that returns a {@link BodySubscriber
     * BodySubscriber}{@code <S,T>} obtained from {@link
     * BodySubscriber#fromSubscriber(Subscriber, Function)}, with the
     * given {@code subscriber} and {@code finisher} function.
     *
     * <p> The given {@code finisher} function is applied after the given
* subscriber's {@code onComplete} has been invoked. The {@code finisher}
     * function is invoked with the given subscriber, and returns a value
     * that is set as the response's body.
     *
     * @apiNote This method can be used as an adapter between {@code
     * BodySubscriber} and {@code Flow.Subscriber}.
     *
     * <p> For example:
     * <pre> {@code
* TextSubscriber subscriber = ...; // accumulates bytes and transforms them into a String.*
     * HttpResponse<String> response = client.sendAsync(request,
* BodyHandler.fromSubscriber(subscriber, TextSubscriber::getTextResult)).join();
     * String text = response.body();
     * }</pre>
     *
     * @param <S> the type of the Subscriber
     * @param <T> the type of the response body
     * @param subscriber the subscriber
* @param finisher a function to be applied after the subscriber has completed
     * @return a response body handler
     */
public static <S extends Subscriber<? super List<ByteBuffer>>,T> BodyHandler<T>
    fromSubscriber(S subscriber, Function<S,T> finisher) { ... }


  // And a similar pair of BodySubscriber methods, omitted for brevity.


-Chris.

Reply via email to