Hi Jige,

I opened an issue to track the concern, and I have proposed a change which 
seems to align well with how parallel streams behave under caller thread 
interruption.

I've opened the following PR for review:
https://github.com/openjdk/jdk/pull/23467

If you are able to make a local OpenJDK build with that solution you could 
check if it addresses your use-cases (or not).
[https://opengraph.githubassets.com/00e04f8a63bde12217df087df7ef8edee563adf7e925d07c75bdeae092180094/openjdk/jdk/pull/23467]<https://github.com/openjdk/jdk/pull/23467>
8349462: Gatherers.mapConcurrent could support async interrupts by 
viktorklang-ora · Pull Request #23467 · 
openjdk/jdk<https://github.com/openjdk/jdk/pull/23467>
This change is likely going to need some extra verbiage in the spec for 
mapConcurrent, and thus a CSR. This behavior aligns mapConcurrent with how 
parallel streams work in conjunction with interrup...
github.com


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yuj...@gmail.com>
Sent: Wednesday, 5 February 2025 16:24
To: Viktor Klang <viktor.kl...@oracle.com>
Cc: core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
Subject: Re: [External] : Re: mapConcurrent() with InterruptedException

Thanks Viktor!

I understand the problem.

The main reason I asked is because I want to understand how the core Java team 
thinks of throwing an unchecked exception.

As explained above, I consider losing cancellability a big deal, a deal breaker 
even. And I thought throwing unchecked is more acceptable. Because the most 
common reason the mapConcurrent() VT can be interrupted is due to cancellation 
from a parent mapConcurrent(), or a parent Structured Concurrency scope. The 
cancellation could be either from an organic exception, or from the downstream 
not needing more elements, like maybe due to findFirst() already getting an 
element.

In both cases, since the concurrent operation is already cancelled (result 
ignored), what exception pops up to the top level isn't that big of a deal 
(perhaps only a log record will be seen?)

But if the core Java team considers it a bad idea, I would love to learn and 
adjust.

On Tue, Feb 4, 2025 at 4:41 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
Hi,

The problem is that mapConcurrent cannot throw InterruptedException because 
that is a checked exception, so we cannot clear the interrupted flag and throw 
that exception.

So the updated semantics is to not cut the stream short but instead run to 
completion, restoring the interruption flag.

There exists a couple of alternatives to this approach which I am 
contemplating, but they need to be further explored before I consider moving 
forward with any of them.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Monday, 27 January 2025 17:00
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: mapConcurrent() with InterruptedException

Thanks Viktor!

It looks like the current fix ignores interruption.

I want to make sure my concern of it defeating cancellation is heard and 
understood.

The scenarios I worry about is for a mapConcurrent() that fans out to another 
method call, which internally calls mapConcurrent() as implementation detail.

An example:


List<RefundResponse> refundHelper(transaction) {
  transaction.creditCardAccounts.stream()
    .gather(mapConcurrent(acct -> service.refund(acct))
    .toList();
}

transactions.stream()
    .gather(mapConcurrent(transaction -> refundHelper(transaction));

It seems undesirable that in such a case all the service.refund() calls become 
non cancellable, because the only way the outer mapConcurrent() cancels the 
refundHelper() calls is through Thread.interrupt() the virtual threads that 
call refundHelper(), which would then be disabled by the inner mapConcurrent().

Does this example make sense to you? I can further explain if anything isn't 
clear.

But I want to make sure the decision to disable interruption is deliberate 
judgement call that such nested mapConcurrent() is unlikely,or not important.

Cheers,



On Mon, Jan 27, 2025 at 6:11 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
Hi!

Please see: 
https://github.com/openjdk/jdk/pull/23100<https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23100__;!!ACWV5N9M2RV99hQ!IzQs0G26M7ZGPwJ3YJpCcS0gxi6BjqoBux2T5u0cHud_zb_mHLfiIrASSZiP0ynNgnaAuwuOh__WinK8$>

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Sunday, 26 January 2025 23:03
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: [External] : Re: mapConcurrent() with InterruptedException

Checking in on what you've found out, Viktor.

From where we left off, I understand that you were looking at alternatives 
instead of silent truncation?

Have you reached any conclusion?

We touched on disallowing interruption during mapConcurrent(). I still have 
concerns with disabling cancellation, because it basically undoes this API note 
from the 
javadoc<https://cr.openjdk.org/~alanb/sc-20240503/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)>:


API Note:

In progress tasks will be attempted to be cancelled, on a best-effort basis, in 
situations where the downstream no longer wants to receive any more elements.
In reality, people will use mapConcurrent() to fan out rpcs. Sometimes these 
rpcs are just a single blocking call; yet sometimes they may themselves be a 
Structured Concurrency scope, with 2 or 3 rpcs that constitute a single logical 
operation. Under two conditions, cancellation is imho important semantic:

  1.  The downstream code uses filter().findFirst(), and when it sees an 
element, it will return and no longer needs the other pending rpcs to complete. 
If cancellation is disabled, these unnecessary rpcs will waste system resources.
  2.  One of the rpc throws and the Stream pipeline needs to propagate the 
exception. Again, if the other rpcs cannot be cancelled, we'll have many zombie 
rpcs.

Zombie rpcs may or may not be a deal breaker, depending on the specific use 
case. But for a JDK library, losing cancellation would have a negative impact 
on usability.

My 2c,


On Fri, Jan 3, 2025 at 9:18 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
Hi Ben,

Thanks for raising these questions—getting feedback is crucial in the Preview 
stage of features.

I wrote a reply to the Reddit thread so I'll just summarize here:

It is important to note that mapConcurrent() is not a part of the Structured 
Concurrency JEPs, so it is not designed to join SC scopes.

I'm currently experimenting with ignoring-but-restoring interrupts on the 
"calling thread" for mapConcurrent(), as well as capping work-in-progress to 
maxConcurrency (not only capping the concurrency but also the amount of 
completed-but-yet-to-be-pushed work). Both of these adjustments should increase 
predictability of behavior in the face of blocking operations with variable 
delays.

Another adjustment I'm looking at right now is to harden/improve the cleanup to 
wait for concurrent tasks to acknowledge cancellation, so that once the 
finisher is done executing the VTs are known to have terminated.

As for not preserving the encounter order, that would be a completely different 
thing, and I'd encourage you to experiment with that if that functionality 
would be interesting for your use-case(s).

Again, thanks for your feedback!

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: core-libs-dev 
<core-libs-dev-r...@openjdk.org<mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Friday, 3 January 2025 17:53
To: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: mapConcurrent() with InterruptedException

Hi Java Experts,

I sent this email incorrectly to loom-dev@ and was told on Reddit that 
core-libs-dev is the right list.

The question is about the behavior of mapConcurrent() when the thread is 
interrupted.

Currently mapConcurrent()'s finisher phase will re-interrupt the thread, then 
stop at whatever element that has already been processed and return.

This strikes me as a surprising behavior, because for example if I'm running:

   Stream.of(1, 2, 3)
        .gather(mapConcurrent(i -> i * 2))
        .toList()

and the thread is being interrupted, the result could be any of [2], [2, 4] or 
[2, 4, 6].

Since thread interruption is cooperative, there is no guarantee that the thread 
being interrupted will just abort. It's quite possible that it'll keep going 
and then will use for example [2] as the result of doubling the list of [1, 2, 
3], which is imho incorrect.

In the 
Reddit<https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/observations_of_gatherersmapconcurrent/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8M4PjPb7L$>
 thread, someone argued that interruption rarely happens so it's more of a 
theoretical issue. But interruption can easily happen in Structured Concurrency 
or in mapConcurrent() itself if any subtask has failed in order to 
cancel/interrupt the other ongoing tasks.

There had been discussion about alternative strategies:

  1.  Don't respond to interruption and just keep running to completion.
  2.  Re-interrupt thread and wrap the InterruptedException in a standard 
unchecked exception (StructuredConcurrencyInterruptedException?).

I have concerns with option 1 because it disables cancellation propagation when 
mapConcurrent() itself is used in a subtask of a parent mapConcurrent() or in a 
StructuredConcurrencyScope.

Both equivalent Future-composition async code, or C++'s fiber trees support 
cancellation propagation and imho it's a critical feature or else it's possible 
that a zombie thread is still sending RPCs long after the main thread has 
exited (failed, or falled back to some default action).

My arguments for option 2:

  1.  InterruptedException is more error prone than traditional checked 
exceptions for users to catch and handle. They can forget to re-interrupt the 
thread. It's so confusing that even seasoned programmers may not know they are 
supposed to re-interrupt the thread.
  2.  With Stream API using functional interfaces like Supplier, Function, the 
option of just tacking on "throws IE" isn't available to many users.
  3.  With Virtual Threads, it will be more acceptable, or even become common 
to do blocking calls from a stream operation (including but exclusive to 
mapConcurrent()). So the chance users are forced to deal with IE will become 
substantially higher.
  4.  Other APIs such as the Structured Concurrency API have already started 
wrapping system checked exceptions like ExecutionException, TimeoutException in 
unchecked exceptions ( 
join()<https://urldefense.com/v3/__https://download.java.net/java/early_access/loom/docs/api/java.base/java/util/concurrent/StructuredTaskScope.html*join()__;Iw!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MxGG4HzA$>
 for example).
  5.  Imho, exceptions that we'd rather users not catch and handle but instead 
should mostly just propagate up as is, should be unchecked.

There is also a side 
discussion<https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/comment/m4z4f8c/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MyZYl02k$>
 about whether mapConcurrent() is better off preserving input order or push to 
downstream as soon as an element is computed. I'd love to discuss that topic 
too but maybe it's better to start a separate thread?

Thank you and cheers!

Ben Yu

Reply via email to