Sorry, did the PR stop using Semaphore? I had naively thought that mapConcurrent() will keep a buffer of Future of all currently-running concurrent tasks (it can be a ConcurrentMap<TaskKey, Future> if we don't have to ensure FIFO).
Upon interruption, the main thread can call .cancel(true) on all pending Futures; optionally join with the VTs (if we need to block until all VTs exit); then propagate exception. Upon completion, each task just removes itself from the ConcurrentMap. Just in case it adds anything. On Thu, Feb 6, 2025 at 6:47 AM Viktor Klang <viktor.kl...@oracle.com> wrote: > After some more investigation it seems tractable to propagate interruption > of the caller in sequential mode, but parallel mode will require much > bigger considerations. > > I made a comment to that effect on the JBS issue: > https://bugs.openjdk.org/browse/JDK-8349462?focusedId=14750017&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14750017 > > > Cheers, > √ > > > *Viktor Klang* > Software Architect, Java Platform Group > Oracle > ------------------------------ > *From:* Viktor Klang <viktor.kl...@oracle.com> > *Sent:* Thursday, 6 February 2025 11:51 > *To:* Jige Yu <yuj...@gmail.com> > *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> > *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException > > I think alignment in behavior between parallel Stream and mapConcurrent in > terms of how interruptions are handled is a possible path forward. > > I decided to close the PR for now as I realized my parallel Stream example > had misled me regarding its exception throwing, so I'll need to go back and > refine the solution. > > It still seems solvable though. > > Cheers, > √ > > > *Viktor Klang* > Software Architect, Java Platform Group > Oracle > ------------------------------ > *From:* Jige Yu <yuj...@gmail.com> > *Sent:* Wednesday, 5 February 2025 19:20 > *To:* Viktor Klang <viktor.kl...@oracle.com> > *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> > *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException > > Oh good call! > > I forgot to check what parallel streams do upon interruption (didn't think > they do any blocking calls, but at least the main thread must block). > > On Wed, Feb 5, 2025 at 8:18 AM Viktor Klang <viktor.kl...@oracle.com> > wrote: > > Hi Jige, > > I opened an issue to track the concern, and I have proposed a change which > seems to align well with how parallel streams behave under caller thread > interruption. > > I've opened the following PR for review: > https://github.com/openjdk/jdk/pull/23467 > <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$> > > If you are able to make a local OpenJDK build with that solution you could > check if it addresses your use-cases (or not). > > <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$> > 8349462: Gatherers.mapConcurrent could support async interrupts by > viktorklang-ora · Pull Request #23467 · openjdk/jdk > <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23467__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89E_mLvMu$> > This change is likely going to need some extra verbiage in the spec for > mapConcurrent, and thus a CSR. This behavior aligns mapConcurrent with how > parallel streams work in conjunction with interrup... > github.com > <https://urldefense.com/v3/__http://github.com__;!!ACWV5N9M2RV99hQ!M4nB0WxHU_1jcbZoNGy3DD81Oefr7BuObJ-7FHb_mw51HqMBI0BjFy97H6GnjZaX402UQZK89DBkMefT$> > > > Cheers, > √ > > > *Viktor Klang* > Software Architect, Java Platform Group > Oracle > ------------------------------ > *From:* Jige Yu <yuj...@gmail.com> > *Sent:* Wednesday, 5 February 2025 16:24 > *To:* Viktor Klang <viktor.kl...@oracle.com> > *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> > *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException > > Thanks Viktor! > > I understand the problem. > > The main reason I asked is because I want to understand how the core Java > team thinks of throwing an unchecked exception. > > As explained above, I consider losing cancellability a big deal, a deal > breaker even. And I thought throwing unchecked is more acceptable. Because > the most common reason the mapConcurrent() VT can be interrupted is due to > cancellation from a parent mapConcurrent(), or a parent Structured > Concurrency scope. The cancellation could be either from an organic > exception, or from the downstream not needing more elements, like maybe due > to findFirst() already getting an element. > > In both cases, since the concurrent operation is already cancelled (result > ignored), what exception pops up to the top level isn't that big of a deal > (perhaps only a log record will be seen?) > > But if the core Java team considers it a bad idea, I would love to learn > and adjust. > > On Tue, Feb 4, 2025 at 4:41 AM Viktor Klang <viktor.kl...@oracle.com> > wrote: > > Hi, > > The problem is that mapConcurrent cannot throw InterruptedException > because that is a checked exception, so we cannot clear the interrupted > flag and throw that exception. > > So the updated semantics is to not cut the stream short but instead run to > completion, restoring the interruption flag. > > There exists a couple of alternatives to this approach which I am > contemplating, but they need to be further explored before I consider > moving forward with any of them. > > Cheers, > √ > > > *Viktor Klang* > Software Architect, Java Platform Group > Oracle > ------------------------------ > *From:* Jige Yu <yuj...@gmail.com> > *Sent:* Monday, 27 January 2025 17:00 > *To:* Viktor Klang <viktor.kl...@oracle.com> > *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> > *Subject:* Re: [External] : Re: mapConcurrent() with InterruptedException > > Thanks Viktor! > > It looks like the current fix ignores interruption. > > I want to make sure my concern of it defeating cancellation is heard and > understood. > > The scenarios I worry about is for a mapConcurrent() that fans out to > another method call, which internally calls mapConcurrent() as > implementation detail. > > An example: > > List<RefundResponse> refundHelper(transaction) { > transaction.creditCardAccounts.stream() > .gather(mapConcurrent(acct -> service.refund(acct)) > .toList(); > } > > transactions.stream() > .gather(mapConcurrent(transaction -> refundHelper(transaction)); > > > It seems undesirable that in such a case all the service.refund() calls > become non cancellable, because the only way the outer mapConcurrent() > cancels the refundHelper() calls is through Thread.interrupt() the virtual > threads that call refundHelper(), which would then be disabled by the inner > mapConcurrent(). > > Does this example make sense to you? I can further explain if anything > isn't clear. > > But I want to make sure the decision to disable interruption is deliberate > judgement call that such nested mapConcurrent() is unlikely,or not > important. > > Cheers, > > > > On Mon, Jan 27, 2025 at 6:11 AM Viktor Klang <viktor.kl...@oracle.com> > wrote: > > Hi! > > Please see: https://github.com/openjdk/jdk/pull/23100 > <https://urldefense.com/v3/__https://github.com/openjdk/jdk/pull/23100__;!!ACWV5N9M2RV99hQ!IzQs0G26M7ZGPwJ3YJpCcS0gxi6BjqoBux2T5u0cHud_zb_mHLfiIrASSZiP0ynNgnaAuwuOh__WinK8$> > > Cheers, > √ > > > *Viktor Klang* > Software Architect, Java Platform Group > Oracle > ------------------------------ > *From:* Jige Yu <yuj...@gmail.com> > *Sent:* Sunday, 26 January 2025 23:03 > *To:* Viktor Klang <viktor.kl...@oracle.com> > *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> > *Subject:* [External] : Re: mapConcurrent() with InterruptedException > > Checking in on what you've found out, Viktor. > > From where we left off, I understand that you were looking at alternatives > instead of silent truncation? > > Have you reached any conclusion? > > We touched on disallowing interruption during mapConcurrent(). I still > have concerns with disabling cancellation, because it basically undoes this > API note from the javadoc > <https://cr.openjdk.org/~alanb/sc-20240503/java.base/java/util/stream/Gatherers.html#mapConcurrent(int,java.util.function.Function)> > : > > API Note: In progress tasks will be attempted to be cancelled, on a > best-effort basis, in situations where the downstream no longer wants to > receive any more elements. > In reality, people will use mapConcurrent() to fan out rpcs. Sometimes > these rpcs are just a single blocking call; yet sometimes they may > themselves be a Structured Concurrency scope, with 2 or 3 rpcs that > constitute a single logical operation. Under two conditions, cancellation > is imho important semantic: > > 1. The downstream code uses filter().findFirst(), and when it sees an > element, it will return and no longer needs the other pending rpcs to > complete. If cancellation is disabled, these unnecessary rpcs will waste > system resources. > 2. One of the rpc throws and the Stream pipeline needs to propagate > the exception. Again, if the other rpcs cannot be cancelled, we'll have > many zombie rpcs. > > Zombie rpcs may or may not be a deal breaker, depending on the specific > use case. But for a JDK library, losing cancellation would have a negative > impact on usability. > > My 2c, > > > On Fri, Jan 3, 2025 at 9:18 AM Viktor Klang <viktor.kl...@oracle.com> > wrote: > > Hi Ben, > > Thanks for raising these questions—getting feedback is crucial in the > Preview stage of features. > > I wrote a reply to the Reddit thread so I'll just summarize here: > > It is important to note that *mapConcurrent()* is not a part of the > Structured Concurrency JEPs, so it is not designed to join SC scopes. > > I'm currently experimenting with ignoring-but-restoring interrupts on the > "calling thread" for *mapConcurrent()*, as well as capping > work-in-progress to *maxConcurrency* (not only capping the concurrency > but also the amount of completed-but-yet-to-be-pushed work). Both of these > adjustments should increase predictability of behavior in the face of > blocking operations with variable delays. > > Another adjustment I'm looking at right now is to harden/improve the > cleanup to wait for concurrent tasks to acknowledge cancellation, so that > once the finisher is done executing the VTs are known to have terminated. > > As for not preserving the encounter order, that would be a completely > different thing, and I'd encourage you to experiment with that if that > functionality would be interesting for your use-case(s). > > Again, thanks for your feedback! > > Cheers, > √ > > > *Viktor Klang* > Software Architect, Java Platform Group > Oracle > ------------------------------ > *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of Jige > Yu <yuj...@gmail.com> > *Sent:* Friday, 3 January 2025 17:53 > *To:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org> > *Subject:* mapConcurrent() with InterruptedException > > Hi Java Experts, > > I sent this email incorrectly to loom-dev@ and was told on Reddit that > core-libs-dev is the right list. > > The question is about the behavior of mapConcurrent() when the thread is > interrupted. > > Currently mapConcurrent()'s finisher phase will re-interrupt the thread, > then stop at whatever element that has already been processed and return. > > This strikes me as a surprising behavior, because for example if I'm > running: > > Stream.of(1, 2, 3) > .gather(mapConcurrent(i -> i * 2)) > .toList() > > and the thread is being interrupted, the result could be any of [2], [2, > 4] or [2, 4, 6]. > > Since thread interruption is cooperative, there is no guarantee that the > thread being interrupted will just abort. It's quite possible that it'll > keep going and then will use for example [2] as the result of doubling the > list of [1, 2, 3], which is imho incorrect. > > In the Reddit > <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/observations_of_gatherersmapconcurrent/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8M4PjPb7L$> > thread, > someone argued that interruption rarely happens so it's more of a > theoretical issue. But interruption can easily happen in Structured > Concurrency or in mapConcurrent() itself if any subtask has failed in order > to cancel/interrupt the other ongoing tasks. > > There had been discussion about alternative strategies: > > 1. Don't respond to interruption and just keep running to completion. > 2. Re-interrupt thread and wrap the InterruptedException in a standard > unchecked exception (StructuredConcurrencyInterruptedException?). > > > I have concerns with option 1 because it disables cancellation propagation > when mapConcurrent() itself is used in a subtask of a parent > mapConcurrent() or in a StructuredConcurrencyScope. > > Both equivalent Future-composition async code, or C++'s fiber trees > support cancellation propagation and imho it's a critical feature or else > it's possible that a zombie thread is still sending RPCs long after the > main thread has exited (failed, or falled back to some default action). > > My arguments for option 2: > > 1. InterruptedException is more error prone than traditional checked > exceptions for *users* to catch and handle. They can forget to > re-interrupt the thread. It's so confusing that even seasoned programmers > may not know they are *supposed to* re-interrupt the thread. > 2. With Stream API using functional interfaces like Supplier, > Function, the option of just tacking on "throws IE" isn't available to many > users. > 3. With Virtual Threads, it will be more acceptable, or even become > common to do blocking calls from a stream operation (including but > exclusive to mapConcurrent()). So the chance users are forced to deal with > IE will become substantially higher. > 4. Other APIs such as the Structured Concurrency API have already > started wrapping system checked exceptions like ExecutionException, > TimeoutException in unchecked exceptions ( join() > > <https://urldefense.com/v3/__https://download.java.net/java/early_access/loom/docs/api/java.base/java/util/concurrent/StructuredTaskScope.html*join()__;Iw!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MxGG4HzA$> > for > example). > 5. Imho, exceptions that we'd rather users not catch and handle but > instead should mostly just propagate up as is, should be unchecked. > > There is also a side discussion > <https://urldefense.com/v3/__https://www.reddit.com/r/java/comments/1hr8xyu/comment/m4z4f8c/?utm_source=share&utm_medium=web3x&utm_name=web3xcss&utm_term=1&utm_content=share_button__;!!ACWV5N9M2RV99hQ!L1LHRE2pnYPg43nM0J0dCoV4agscV_rybIV9jY97xn9XJi9d7VoPma4jhx4J4GBeoeQmVud8MyZYl02k$> > about > whether mapConcurrent() is better off preserving input order or push to > downstream as soon as an element is computed. I'd love to discuss that > topic too but maybe it's better to start a separate thread? > > Thank you and cheers! > > Ben Yu > >