James,
On 11/12/17 00:47, James Roper wrote:
> Hi Chris,
>
> This looks like a straight forward way to solve the problem with minimal
> disruption from the existing API. Can I make a few suggestions though?
Of course, your input here is much appreciated.
> We could add a contentLength parameter to fromPublisher, to allow
> Flow.Publishers where the content length is known to be easily converted
> to BodyPublisher:
>
> static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> publisher,
> int contentLength) {
> ...
> }
Good idea. Added ( as can be seen below ).
> This would mean if you were receiving a servlet request body and
> publishing it to another location, then you could do something like this
> (this uses a reactive streams implementation on top of the servlet API
> that I wrote):
>
> HttpServletRequest request = ...
> long contentLength = -1;
> if (request.getHeader("Content-Length") != null) {
> contentLength = Long.parseLong(request.getHeader("Content-Length"));
> }
> Publisher<ByteBuffer> publisher = new
> RequestPublisher(request.startAsync(), 8192);
>
> HttpRequest clientRequest = HttpRequest.newBuilder(target)
> .POST(BodyPublisher.fromPublisher(publisher, contentLength))
> .build()
Nice.
> Perhaps the method could be overloaded for both supplying and not
> supplying a content length.
I think an overload is justified here. Added.
> Similarly, I think a fromSubscriber API that accepted a
> CompletionStage<T> would be a little more fluent than having to supply
> it externally:
Daniel and I discussed this too, but I opted to leave it out initially
for simplicity. I think if we have two overloads, then the simple case
can still be supported with little ceremony, while allowing a more
powerful variant.
> public static <T> BodyHandler<T> fromSubscriber(Subscriber<? super
> List<ByteBuffer>> subscriber, CompletionStage<T> bodyFuture) {
> ...
> }
>
> Then you could have something like this:
>
> TextSubscriber subscriber = ...; // accumulates bytes and transforms
> them into a CompletionStage<String>.
> CompletionStage<String> result = subscriber.getTextResult();
>
> CompletableFuture<String> cf = client
> .sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
> String text = cf.join();
>
> Likewise, this could be an overload of fromSubscriber if we want the
> option of not specifying a body future.
>
> One thing I think needs to be carefully specified is, if the method
> doesn't accept a CompletionStage, when/how the CompletionStage returned
> from send is redeemed.
Hmmm... this could be tricky. I think we can avoid the scenario
completely by accepting a finishing function that can generate/return
the value to use for completion, rather than the CF itself.
Here is an outline of all of the above:
public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {
/**
* Returns a request body publisher whose body is retrieved from the
* given {@code Flow.Publisher}. The returned request body publisher
* has an unknown content length.
*
* @apiNote This method can be used as an adapter between {@code
* BodyPublisher} and {@code Flow.Publisher}, where the amount of
* request body that the publisher will publish is unknown.
*
* @param publisher the publisher responsible for publishing the body
* @return a BodyPublisher
*/
static BodyPublisher fromPublisher(Flow.Publisher<? extends
ByteBuffer> publisher) { ... }
/**
* Returns a request body publisher whose body is retrieved from the
* given {@code Flow.Publisher}. The returned request body publisher
* has the given content length.
*
* <p> The given {@code contentLength} is a positive number, that
* represents the exact amount of bytes the {@code publisher} must
* publish.
*
* @apiNote This method can be used as an adapter between {@code
* BodyPublisher} and {@code Flow.Publisher}, where the amount of
* request body that the publisher will publish is known.
*
* @param publisher the publisher responsible for publishing the body
* @param contentLength a positive number representing the exact
* amount of bytes the publisher will publish
* @throws IllegalArgumentException if the content length is
* non-positive
* @return a BodyPublisher
*/
static BodyPublisher fromPublisher(Flow.Publisher<? extends
ByteBuffer> publisher,
long contentLength) { ... }
public interface BodyHandler<T> {
/**
* Returns a response body handler that returns a {@link
BodySubscriber
* BodySubscriber}{@code <Void>} obtained from {@linkplain
* BodySubscriber#fromSubscriber(Subscriber)}, with the given
* {@code subscriber}.
*
* <p> The response body is not available through this, or the {@code
* HttpResponse} API, but instead all response body is forwarded to
the
* given {@code subscriber}, which should make it available, if
* appropriate, through some other mechanism, e.g. an entry in a
* database, etc.
*
* @apiNote This method can be used as an adapter between {@code
* BodySubscriber} and {@code Flow.Subscriber}.
*
* <p> For example:
* <pre> {@code
* TextSubscriber subscriber = new TextSubscriber();
* HttpResponse<Void> response = client.sendAsync(request,
* BodyHandler.fromSubscriber(subscriber)).join();
* System.out.println(response.statusCode());
* }</pre>
*
* @param subscriber the subscriber
* @return a response body handler
*/
public static BodyHandler<Void>
fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) {
... }
/**
* Returns a response body handler that returns a {@link
BodySubscriber
* BodySubscriber}{@code <S,T>} obtained from {@link
* BodySubscriber#fromSubscriber(Subscriber, Function)}, with the
* given {@code subscriber} and {@code finisher} function.
*
* <p> The given {@code finisher} function is applied after the given
* subscriber's {@code onComplete} has been invoked. The {@code
finisher}
* function is invoked with the given subscriber, and returns a value
* that is set as the response's body.
*
* @apiNote This method can be used as an adapter between {@code
* BodySubscriber} and {@code Flow.Subscriber}.
*
* <p> For example:
* <pre> {@code
* TextSubscriber subscriber = ...; // accumulates bytes and
transforms them into a String.*
* HttpResponse<String> response = client.sendAsync(request,
* BodyHandler.fromSubscriber(subscriber,
TextSubscriber::getTextResult)).join();
* String text = response.body();
* }</pre>
*
* @param <S> the type of the Subscriber
* @param <T> the type of the response body
* @param subscriber the subscriber
* @param finisher a function to be applied after the subscriber
has completed
* @return a response body handler
*/
public static <S extends Subscriber<? super List<ByteBuffer>>,T>
BodyHandler<T>
fromSubscriber(S subscriber, Function<S,T> finisher) { ... }
// And a similar pair of BodySubscriber methods, omitted for brevity.
-Chris.