James,

Thanks for taking the time to look at this, and sending your thoughts.

On 08/12/17 00:30, James Roper wrote:
> Hi all,
>
> I wanted to start a discussion about the use of Flow.Subscriber and
> Flow.Publisher in JEP 321 (HTTP Client API).
>
> It seems that users are required to implement their own publishers and
> subscribers, that is, they can't take a Flow.Publisher or
> Flow.Subscriber provided by another reactive streams implementation, and
> pass it on to the HttpClient API. The reason for this is that the
> HttpClient API doesn't accept Flow.Publisher/Flow.Subscriber, rather it
> extends them in HttpRequest.BodyPublisher and
> HttpResponse.BodySubscriber, and then requires the user to return
> instances of those sub interfaces from their BodyHandlers. ...

Great point. I think we can address this with straight forward adapters.
For example:

  public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {

     /**
     * Returns a request body publisher whose body is retrieved from the
     * given {@code Flow.Publisher}. The returned request body publisher
     * has an unknown content length.
     *
     * @apiNote This method can be used as an adapter between {@code
     * BodyPublisher} and {@code Flow.Publisher}.
     *
     * @param publisher the publisher responsible for publishing the body
     * @return a BodyPublisher
     */
static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> publisher) {
        ...
    }

    ...

public BodySubscriber<T> apply(int statusCode, HttpHeaders responseHeaders);

     /**
* Returns a response body handler that returns a {@link BodySubscriber
      * BodySubscriber}{@code <Void>} obtained from {@link
      * BodySubscriber#fromSubscriber(Subscriber)}.
      *
      * @apiNote This method can be used as an adapter between {@code
      * BodySubscriber} and {@code Flow.Subscriber}.
      *
      * <p> For example:
      * <pre> {@code
* TextSubscriber subscriber = ...; // accumulates bytes and transforms them into a String.
      * Supplier<String> result = subscriber::getTextResult;
      *
      * CompletableFuture<String> cf =  client
      *         .sendAsync(request, BodyHandler.fromSubscriber(subscriber))
      *         .thenApply((response -> result.get()));
      * String text = cf.join();
      * }</pre>
      *
      * @param subscriber the subscriber
      * @return a response body handler
      */
public static BodyHandler<Void> fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber) {
         ...
     }

     // Add an equivalent BodySubscriber ...


This would allow the API to retain its Flow specific types ( that add
additional HTTP specific and API behavior ), while interacting, without
much fuss, with regular Publishers and Subscribers.

-Chris.

Reply via email to