On Fri, 21 Feb 2025 11:20:09 GMT, Volkan Yazici <vyaz...@openjdk.org> wrote:

>> Hi - Please find here a PR that improves streaming strategy in the 
>> HttpClient.
>> 
>> The HttpClient currently waits until the full request body has been sent 
>> before starting to listen for a response. This is not optimal, in particular 
>> in cases where the server sends back e.g. 500 without even reading the body. 
>> It also prevents starting to stream the response body from the server before 
>> having sent the full request body, which prevents the server to stream back 
>> the request body to the client without reading it fully first.
>> 
>> While writing a test to verify the fix, I also noticed a few places where 
>> additional tasks needed to be performed asynchronously (= delegated to the 
>> executor) to support this.
>
> src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java line 428:
> 
>> 426:         var sendBody = exchImpl.sendBodyAsync();
>> 427:         CompletableFuture<Response> cf = 
>> exchImpl.getResponseAsync(executor);
>> 428:         sendBody.exceptionally((t) -> {
> 
> Why don't we mirror this as in `cf.exceptionally(t -> ...)`? That is, what if 
> receive-response-body fails while send-request-body is in flight? (Thinking 
> of push promises or alike early responses that server delivers without fully 
> consuming the request.)

If we get an error while receiving the response headers or the response body 
the exchange will be cancelled (with cancelImpl) which will stop/cancel the 
sending of the request body.

> src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java line 
> 406:
> 
>> 404:                         .thenAccept(this::cancelIfFailed)
>> 405:                         .thenAcceptAsync((s) -> requestMoreBody(),
>> 406:                                 exchange.executor().safeDelegate());
> 
> Shall we document the need to switch to this particular executor? (I can 
> guess it from the context of the PR, yet it is not apparent to me in 
> isolation.)

If we don't spawn a new task for executing requestMoreBody() here the 
PostFromGet test will get wedged. The regular executor attempts to execute as 
much tasks as it can in the current thread to minimize context switches. This 
usually works well - but we're doing something a bit unusual in the PostFromGet 
test. We're using InputStream for reading and posting, which means we are 
risking to block within a reactive stream pipeline. Spawning a new task here 
ensure we don't block in the pipeline.

> src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java line 
> 703:
> 
>> 701:                     bodySentCF.completeAsync(() -> this, exec);
>> 702:                 } else {
>> 703:                     exec.ensureExecutedAsync(this::requestMoreBody);
> 
> This means that _"the immediate run shortcut for non-selector threads"_ 
> facilitated by `DelegatingExecutor::execute` will be skipped. Why?

Same answer as above. Our subscribers/publishers used in the PostFromGet test 
are based on InputStream which is blocking. If we execute request in the 
current thread it risk executing the synchronous scheduler loop in the current 
thread, which might block due to the nature of the subscribers/publishers. What 
we're doing here is just ensuring that we are not going to block from within 
the pipeline.

> src/java.net.http/share/classes/jdk/internal/net/http/Stream.java line 1113:
> 
>> 1111:                 // decremented on the HttpClient.
>> 1112:                 cancelImpl(ex);
>> 1113:             }
> 
> Curious: This _fix_ is not specific to this PR, that is, this should have 
> been fixed even in the absence of the feature this PR is delivering, right?

kind of. Now we're calling request in a separate thread, so if it throws (which 
it must not but still might do since it's potentially implemented by user 
code), then the excepion won't get caught by the caller. So the fact that we 
execute request() in a separate thread means we need to protect here against 
stray exception it might throw.

> src/java.net.http/share/classes/jdk/internal/net/http/Stream.java line 1124:
> 
>> 1122:             if (debug.on())
>> 1123:                 debug.log("RequestSubscriber: onSubscribe, request 1");
>> 1124:             
>> exchange.executor().safeDelegate().execute(this::tryRequestMore);
> 
> I'd appreciate it if we can document the need to execute this asynchronously 
> using this particular executor – while it is not ultimate measure, 
> nevertheless, it is not apparent to me. 🙈

PostFromGet will/might get wedged if we execute inline at this point. See my 
previous explanations.

-------------

PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965417642
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965430555
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965435443
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965439082
PR Review Comment: https://git.openjdk.org/jdk/pull/23716#discussion_r1965441720

Reply via email to