Hi Jige,

First of all—you're most welcome. Thanks for your insightful questions.

>Temptation for Race Semantics: The beauty of mapConcurrent() integrating with 
>the Stream API means developers will naturally be drawn to use it for 
>race-like scenarios. Operations like findFirst() or limit(N) to get the first 
>few completed results are very intuitive combinations.

It's important to distinguish between spatial (encounter) order and temporal 
(availability) order.

If we look at `Stream::findFirst()` we see:

«Returns an 
Optional<https://docs.oracle.com/en/java/javase/23/docs/api/java.base/java/util/Optional.html>
 describing the first element of this stream, or an empty Optional if the 
stream is empty. If the stream has no encounter order, then any element may be 
returned.» - 
https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Stream.html#findFirst()

And if we look at `Stream::limit(long)` we see:

«While limit() is generally a cheap operation on sequential stream pipelines, 
it can be quite expensive on ordered parallel pipelines, especially for large 
values of maxSize, since limit(n) is constrained to return not just any n 
elements, but the first n elements in the encounter order. » (emphasis mine) - 
https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Stream.html#limit(long)

So, we can conclude that "first" and "limit" are about encounter order, and we 
can conclude that the presence of parallel does not change that—only 
"unorderedness" may change that.

>Surprise for Race Semantics Users: Following from the above, it could be 
>surprising for developers when they realize that the inherent input ordering 
>of mapConcurrent() means it's not optimized for these race scenarios. The 
>expectation would be that findFirst() returns as soon as any task completes, 
>but ordering can delay this if an earlier task (in input order) is slower.

This should be addressed above.

>Ordering Assumption in Concurrency: My experience is that ordering is not 
>typically a default assumption when dealing with operations explicitly marked 
>as "parallel" or "concurrent." For instance, Stream.forEach() on a parallel 
>stream does not guarantee encounter order, presumably for performance reasons 
>– a similar trade-off to what's being discussed for mapConcurrent(). 
>Developers often consult documentation for ordering guarantees in concurrent 
>contexts rather than assuming them.

This should also be addressed above.

>Expectation of "True" Concurrency: When I see an API like 
>mapConcurrent(maxConcurrency, mapper), my mental model is that if 
>maxConcurrency permits, new tasks should be initiated as soon as a slot is 
>free.

This is interesting, because this is how mapConcurrent used to work. It only 
placed the limit of concurrent work in progress and not work not yet possible 
to propagate downstream. This was changed, because a delayed initial (in 
encounter order) item may let subsequent (completed) work queue up indefinitely.

So in conclusion, there's still room for a different take on 
"mapConcurrentReorder" (name of course left up to the reader), and the good 
news is that such a Gatherer can be implemented, evaluated, hardened, etc 
outside of the JDK—and potentially some day something like it ends up in the 
JDK.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

________________________________
From: Jige Yu <yuj...@gmail.com>
Sent: Wednesday, 4 June 2025 16:20
To: Viktor Klang <viktor.kl...@oracle.com>
Cc: core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
Subject: Re: [External] : Re: Should mapConcurrent() respect time order instead 
of input order?


Hi Viktor,

Thank you for sharing that the general feedback on mapConcurrent() has been 
positive and for the insights into the ongoing enhancements, especially around 
interruption handling and work-in-progress tracking.

To clarify my own position, I am also extremely enthusiastic about the 
mapConcurrent() API overall. It offers an elegant and straightforward way to 
manage homogenous, I/O-intensive concurrent tasks within a structured 
concurrency model, which is a significant improvement and a much-needed 
addition. My feedback on ordering is aimed at maximizing its potential.

I'd like to elaborate on a few specific scenarios and expectations that inform 
my perspective on the ordering:

  1.  Temptation for Race Semantics: The beauty of mapConcurrent() integrating 
with the Stream API means developers will naturally be drawn to use it for 
race-like scenarios. Operations like findFirst() or limit(N) to get the first 
few completed results are very intuitive combinations. For example:

Java

// Hypothetical use case: find the fastest responding service
Optional<Result> fastestResult = serviceUrls.stream()
    .gather(Gatherers.mapConcurrent(MAX_CONCURRENCY, url -> fetch(url)))
    .findFirst();


  2.  Surprise for Race Semantics Users: Following from the above, it could be 
surprising for developers when they realize that the inherent input ordering of 
mapConcurrent() means it's not optimized for these race scenarios. The 
expectation would be that findFirst() returns as soon as any task completes, 
but ordering can delay this if an earlier task (in input order) is slower.

  3.  Ordering Assumption in Concurrency: My experience is that ordering is not 
typically a default assumption when dealing with operations explicitly marked 
as "parallel" or "concurrent." For instance, Stream.forEach() on a parallel 
stream does not guarantee encounter order, presumably for performance reasons – 
a similar trade-off to what's being discussed for mapConcurrent(). Developers 
often consult documentation for ordering guarantees in concurrent contexts 
rather than assuming them.

  4.  Expectation of "True" Concurrency: When I see an API like 
mapConcurrent(maxConcurrency, mapper), my mental model is that if 
maxConcurrency permits, new tasks should be initiated as soon as a slot is 
free. For example, with maxConcurrency=2:

     *   Task 1 starts.
     *   Task 2 starts.
     *   If Task 2 finishes while Task 1 is still running, I would expect Task 
3 to run concurrently alongside task 1, because the max concurrency is 2, not 
1. The current ordered behavior, where Task 3 might have to wait for Task 1 to 
complete before its result can be processed (even if Task 3 itself could have 
started and finished), can feel a bit counterintuitive to the notion of 
maximizing concurrency up to the specified limit. It almost feels like not a 
"max concurrency", but "max buffer size".

These points are offered to highlight potential areas where the current default 
could lead to subtle surprises or suboptimal performance for useful concurrent 
patterns.

Thanks again for the open discussion and for your work on these valuable 
additions to the JDK.

Best regards,

On Tue, Jun 3, 2025 at 2:13 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
The general feedback received thus far has been primarily positive. There have 
been a few behavior-related enhancements over the previews to better handle 
interruption (there's still room to improve there, as per our concurrent 
conversation) as well as some improvements to work-in-progress tracking.

It will be interesting to see which Gatherer-based operations will be devised 
by Java developers in the future.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Monday, 2 June 2025 18:54
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Should mapConcurrent() respect time order instead 
of input order?


Hi Viktor,

Thanks for your reply and for sharing your experience regarding user 
preferences. I appreciate that perspective.

You're right, if an unordered version of mapConcurrent proves to be widely 
beneficial and is implemented and adopted by the community, it could certainly 
make a strong case for future inclusion in the JDK.

I wanted to clarify a nuance regarding user preference that I might not have 
articulated clearly before. If the question is simply "ordered or unordered?", 
in isolation, I can see why many, myself included, might lean towards "ordered" 
as a general preference.

However, the decision becomes more complex when the associated trade-offs are 
considered. If the question were phrased more like, "Do you prefer an ordered 
mapConcurrent by default, even if it entails potential performance overhead and 
limitations for certain use cases like race() operations, versus an unordered 
version that offers higher throughput and broader applicability in such 
scenarios?" my (and perhaps others') answer might differ. The perceived cost 
versus benefit of ordering changes significantly when these factors are 
explicit.

My initial suggestion stemmed from the belief that the performance and 
flexibility gains of an unordered approach for I/O-bound tasks would, in many 
practical situations, outweigh the convenience of default ordering, especially 
since ordering can be reintroduced relatively easily, and explicitly, when 
needed.

Thanks again for the discussion.

Best regards,

On Mon, Jun 2, 2025 at 8:51 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
>My perspective is that strict adherence to input order for mapConcurrent() 
>might not be the most common or beneficial default behavior for users.

If there is indeed a majority who would benefit from an unordered version of 
mapConcurrent (my experience is that the majority prefer ordered) then, since 
it is possible to implement such a Gatherer outside of the JDK, this is 
something which will be constructed, widely used, and someone will then propose 
to add something similar to the JDK.

>While re-implementing the gatherer is a possibility, the existing 
>implementation is non-trivial, and creating a custom, robust alternative 
>represents a significant undertaking.

The existing version needs to maintain order, which adds to the complexity of 
the implementation. Implementing an unordered version would likely look 
different.
I'd definitely encourage taking the opportunity to attempt to implement it.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle

________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Monday, 2 June 2025 17:05
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Should mapConcurrent() respect time order instead 
of input order?


Thank you for your response and for considering my feedback on the 
mapConcurrent() gatherer. I understand and respect that the final decision 
rests with the JDK maintainers.

I would like to offer a couple of further points for consideration. My 
perspective is that strict adherence to input order for mapConcurrent() might 
not be the most common or beneficial default behavior for users. I'd be very 
interested to see any research or data that suggests otherwise, as that would 
certainly inform my understanding.

From my experience, a more common need is for higher throughput in 
I/O-intensive operations. The ability to support use cases like race()—where 
the first successfully completed operation determines the outcome—also seems 
like a valuable capability that is currently infeasible due to the ordering 
constraint.

As I see it, if a developer specifically requires the input order to be 
preserved, this can be achieved with relative ease by applying a subsequent 
sorting operation. For instance:

.gather(mapConcurrent(...))
.sorted(Comparator.comparing(Result::getInputSequenceId))


The primary challenge in these scenarios is typically the efficient fan-out and 
execution of concurrent tasks, not the subsequent sorting of results.

Conversely, as you've noted, there isn't a straightforward way to modify the 
current default ordered behavior to achieve the higher throughput or race() 
semantics that an unordered approach would naturally provide.

While re-implementing the gatherer is a possibility, the existing 
implementation is non-trivial, and creating a custom, robust alternative 
represents a significant undertaking. My hope was that an unordered option 
could be a valuable addition to the standard library, benefiting a wider range 
of developers.

Thank you again for your time and consideration.


On Mon, Jun 2, 2025 at 7:48 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
>Even if it by default preserves input order, when I explicitly called 
>stream.unordered(), could mapConcurrent() respect that and in return achieve 
>higher throughput with support for race?

The Gatherer doesn't know whether the Stream is unordered or ordered. The 
operation should be semantically equivalent anyway.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Monday, 2 June 2025 16:29
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>; 
core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: [External] : Re: Should mapConcurrent() respect time order instead of 
input order?

Sorry. Forgot to copy to the mailing list.

On Mon, Jun 2, 2025 at 7:27 AM Jige Yu 
<yuj...@gmail.com<mailto:yuj...@gmail.com>> wrote:
Thanks Viktor!

I was thinking from my own experience that I wouldn't have automatically 
assumed that a concurrent fanout library would by default preserve input order.

And I think wanting high throughput with real-life utilities like race would be 
more commonly useful.

But I could be wrong.

Regardless, mapConcurrent() can do both, no?

Even if it by default preserves input order, when I explicitly called 
stream.unordered(), could mapConcurrent() respect that and in return achieve 
higher throughput with support for race?



On Mon, Jun 2, 2025 at 2:33 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
Hi!

In a similar vein to the built-in Collectors,
the built-in Gatherers provide solutions to common stream-related problems, but 
also, they also serve as "inspiration" for developers for what is possible to 
implement using Gatherers.

If someone, for performance reasons, and with a use-case which does not require 
encounter-order, want to take advantage of that combination of circumstances, 
it is definitely possible to implement your own Gatherer which has that 
behavior.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: core-libs-dev 
<core-libs-dev-r...@openjdk.org<mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of Jige Yu <yuj...@gmail.com<mailto:yuj...@gmail.com>>
Sent: Sunday, 1 June 2025 21:08
To: core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org> 
<core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Should mapConcurrent() respect time order instead of input order?

It seems like for most people, input order isn't that important for concurrent 
work, and concurrent results being in non-deterministic order is often expected.

If mapConcurrent() just respect output encounter order:

It'll be able to achieve higher throughput if an early task is slow, For 
example, with concurrency=2, and if the first task takes 10 minutes to run, 
mapConcurrent() would only be able to process 2 tasks within the first 10 
minutes; whereas with encounter order, the first task being slow doesn't block 
the 3rd - 100th elements from being processed and output.

mapConcurrent() can be used to implement useful concurrent semantics, for 
example to support race semantics. Imagine if I need to send request to 10 
candidate backends and take whichever that succeeds first, I'd be able to do:

backends.stream()
    .gather(mapConcurrent(
        backend -> {
          try {
            return backend.fetchOrder();
           } catch (RpcException e) {
             return null; // failed to fetch but not fatal
           }
        })
        .filter(Objects::notNull)
        .findFirst(); // first success then cancel the rest

Cheers,

Reply via email to