Hi Viktor,

Thank you for the detailed explanation and the links to the Javadoc for
findFirst() and limit(). You're absolutely correct in your characterization
of these operations – they are indeed concerned with encounter order, and
parallel execution doesn't inherently change that unless the stream is
explicitly unordered. I want to clarify a few of my points that I didn't
explain clearly (see below).


On Wed, Jun 4, 2025 at 7:38 AM Viktor Klang <viktor.kl...@oracle.com> wrote:

> Hi Jige,
>
> First of all—you're most welcome. Thanks for your insightful questions.
>
> >*Temptation for Race Semantics:* The beauty of mapConcurrent() integrating
> with the Stream API means developers will naturally be drawn to use it for
> race-like scenarios. Operations like findFirst() or limit(N) to get the
> first few completed results are very intuitive combinations.
>
> It's important to distinguish between spatial (encounter) order and
> temporal (availability) order.
>
> If we look at `Stream::findFirst()` we see:
>
> «Returns an Optional
> <https://docs.oracle.com/en/java/javase/23/docs/api/java.base/java/util/Optional.html>
>  describing
> the first element of this stream, or an empty Optional if the stream is
> empty. If the stream has no encounter order, then any element may be
> returned.» -
> https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Stream.html#findFirst()
>
> And if we look at `Stream::limit(long)` we see:
>
> *«While limit() is generally a cheap operation on sequential stream
> pipelines, it can be quite expensive on ordered parallel pipelines,
> especially for large values of maxSize, since limit(n) is constrained to
> return not just any n elements, but **the first n elements in the
> encounter order**. » (emphasis mine) -
> https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Stream.html#limit(long)
> <https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Stream.html#limit(long)>*
>
> So, we can conclude that "first" and "limit" are about *encounter order*,
> and we can conclude that the presence of parallel does not change that—only
> "unorderedness" *may* change that.
>


My apologies if my previous example using findFirst() wasn't as clear as it
could have been. I'd like to clarify how I see the interaction with
mapConcurrent() playing out:

   1.

   *mapConcurrent()'s Influence on Encounter Order:* When I mentioned using
   findFirst() (or limit()) for race semantics, the implicit assumption was
   how mapConcurrent() would feed elements into these terminal operations.
   The "encounter order" that findFirst() sees is precisely what
   mapConcurrent() emits. If mapConcurrent() emits results strictly
   according to input order, then findFirst() will respect that. However,
   if an unordered mapConcurrent() were to emit results in their temporal
   order of completion, then findFirst() would naturally pick the first one
   that *actually finished*, effectively achieving a race. My point is that
   mapConcurrent() is in a position to define the encounter order that
   downstream operations like findFirst() will act upon. The surprise isn't
   with findFirst()'s definition, but with how a strictly ordered
   mapConcurrent() might prevent it from being used effectively for
   "first-to-finish" logic.
   2.

   *The Role of findAny():* You're right that my example using findFirst()
   was more nuanced than I intuitively assumed. A developer well-versed in the
   Stream API nuances might indeed opt for findAny() to implement race
   semantics, especially given its Javadoc explicitly mentioning suitability
   for "maximal performance in parallel operations." findAny() is designed
   to return *any* element, which aligns well with picking the temporally
   first available result from an unordered concurrent process.
   3.

   *Aligning Spec with Intuition:* My broader concern is about the
   intuitive use of these powerful new tools. If mapConcurrent() maintains
   strict input ordering, it's a perfectly valid and understandable
   specification by itself. However, the potential for surprise arises when
   developers, especially those less acquainted with every fine print of
   stream ordering, attempt to combine mapConcurrent() with findFirst() or
   findAny() to build common concurrent utilities like a race. They might
   intuitively expect that "concurrent mapping" followed by "find first/any"
   would yield the result that completes earliest. While developers *should*
   read all related documentation, there's also a design ideal where the API's
   behavior aligns closely with reasonable developer intuition, especially for
   common patterns. An unordered mapConcurrent() (or an option for it)
   would, in my view, better support this intuitive use for race-like
   patterns. Whereas, I have my doubts that developers commonly would
   intuitively assume a concurrent operation to stick to strict ordering (it's
   not what we are used to in most concurrent or multil-threading scenarios)

Ultimately, the question is whether mapConcurrent() should be optimized
primarily for preserving input sequence, or for enabling maximum throughput
and flexibility in concurrent scenarios, including efficient
"first-to-finish" patterns.


>
> >*Surprise for Race Semantics Users:* Following from the above, it could
> be surprising for developers when they realize that the inherent input
> ordering of mapConcurrent() means it's not optimized for these race
> scenarios. The expectation would be that findFirst() returns as soon as
> *any* task completes, but ordering can delay this if an earlier task (in
> input order) is slower.
>
> This should be addressed above.
>
> >*Ordering Assumption in Concurrency:* My experience is that ordering is
> not typically a default assumption when dealing with operations explicitly
> marked as "parallel" or "concurrent." For instance, Stream.forEach() on a
> parallel stream does not guarantee encounter order, presumably for
> performance reasons – a similar trade-off to what's being discussed for
> mapConcurrent(). Developers often consult documentation for ordering
> guarantees in concurrent contexts rather than assuming them.
>
> This should also be addressed above.
>
> >*Expectation of "True" Concurrency:* When I see an API like 
> >mapConcurrent(maxConcurrency,
> mapper), my mental model is that if maxConcurrency permits, new tasks
> should be initiated as soon as a slot is free.
>
> This is interesting, because this is how mapConcurrent used to work. It
> only placed the limit of *concurrent work in progress* and not *work not
> yet possible to propagate downstream*. This was changed, because a
> delayed initial (in encounter order) item may let subsequent (completed)
> work queue up indefinitely.
>
> So in conclusion, there's still room for a different take on
> "mapConcurrentReorder" (name of course left up to the reader), and the good
> news is that such a Gatherer can be implemented, evaluated, hardened, etc
> outside of the JDK—and potentially some day something like it ends up in
> the JDK.
>

Your mention of a potential mapConcurrentReorder() also brings to mind a
thought about API consistency with existing Stream conventions. We see
patterns like Stream.forEach() (which for parallel streams doesn't
guarantee encounter order, prioritizing performance) versus
Stream.forEachOrdered() (which explicitly enforces it). If mapConcurrent()
were to follow a similar naming convention, it might suggest that the base
mapConcurrent() itself would be the version optimized for throughput (and
thus potentially unordered by default), while a hypothetical
mapConcurrentOrdered() would be the variant explicitly providing the strict
ordering guarantee. This is an observation on how naming might align with
established JDK patterns to intuitively guide user expectations about the
default behavior.

This naturally leads to the inherent challenge in designing such an API. It
seems we're trying to balance three desirable, but sometimes conflicting,
goals:

   1. *Strict Encounter Ordering:* Results are emitted in the same order as
   the input elements.
   2. *Bounded Memory Buffering:* Avoid out-of-memory errors by not letting
   completed but un-emitted results queue up indefinitely.
   3. *Optimized (True) Concurrency:* If maxConcurrency is set (e.g., to N),
   the system strives to have N tasks actively running whenever there are
   pending input elements and available concurrency slots, rather than being
   stalled by a slow-to-complete but earlier-in-sequence task.

It appears that achieving all three simultaneously is not possible, and a
compromise must be made.

>From my perspective:

   - Goal #2 (Bounded Memory) is non-negotiable; OOM situations are
   generally unacceptable.
   - Goal #3 (Optimized Concurrency) feels fundamental to an API named
   mapConcurrent(). Users will likely expect it to maximize the concurrent
   execution of tasks up to the specified limit. Deviations from this, often
   necessitated by strict adherence to Goal #1, can lead to surprises.
   - Goal #1 (Strict Ordering), while a "nice-to-have" and sometimes
   beneficial, might not always align with common developer intuition for
   operations explicitly labeled "concurrent," especially if it compromises
   true concurrency. As discussed, ordering can often be reintroduced by the
   caller if specifically needed.

The current implementation understandably prioritizes #1 (Ordering) and #2
(Bounded Memory). However, this prioritization can lead to situations where
#3 (Optimized Concurrency) is not fully realized, which can be
counter-intuitive.

I've already mentioned the race scenario. Consider another example where
this becomes particularly evident. Imagine a scenario with a long-running
task (e.g., for periodic monitoring) combined with a stream of other "real"
tasks:
Java

// Main tasks
List<Callable<Result>> realTasks = ... ;
// A long-running monitoring task
Callable<Void> longRunningMonitoringTask = () -> {
    while (!allRealTasksSeemDone()) { // Simplified condition
        System.out.println("Monitoring...");
        Thread.sleep(5000);
    }
    return null;
};

Stream<Callable<?>> allTasks = Stream.concat(
    Stream.of(longRunningMonitoringTask),
    realTasks.stream().map(task -> (Callable<?>) task)
);

allTasks
    .gather(Gatherers.mapConcurrent(MAX_CONCURRENCY, callable ->
callable.call()))
    .forEach(result -> { /* output results */ });

The longRunningMonitoringTask would usually be the first element, with the
current strictly ordered behavior, it would occupy one of the
MAX_CONCURRENCY slots. and as soon as MAX_CONCURRENCY tasks are
executed, completed or not, all subsequent realTasks (beyond the first one)
would be starved, unable to even start until the longRunningMonitoringTask
completes—which is *never*. This creates a deadlock or significant
processing delay that would likely surprise a developer expecting tasks to
proceed concurrently up to the maxConcurrency limit. An implementation more
aligned with the "base name implies throughput" idea (as per the forEach
analogy) would likely handle this more gracefully by letting tasks run
truly concurrently without an implicit ordering dependency.

This reinforces my belief that an alternative gatherer, or a default
behavior for mapConcurrent() that prioritizes concurrency (as the name
mapConcurrent might suggest to many, akin to forEach), could be very
valuable. I understand this might be something that evolves outside the JDK
initially, and I appreciate you highlighting that path.

Thanks again for the continued discussion and the transparency about the
design choices.

Best regards,


>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
>
> ------------------------------
> *From:* Jige Yu <yuj...@gmail.com>
> *Sent:* Wednesday, 4 June 2025 16:20
> *To:* Viktor Klang <viktor.kl...@oracle.com>
> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Should mapConcurrent() respect time order
> instead of input order?
>
>
> Hi Viktor,
>
> Thank you for sharing that the general feedback on mapConcurrent() has
> been positive and for the insights into the ongoing enhancements,
> especially around interruption handling and work-in-progress tracking.
>
> To clarify my own position, I am also extremely enthusiastic about the
> mapConcurrent() API overall. It offers an elegant and straightforward way
> to manage homogenous, I/O-intensive concurrent tasks within a structured
> concurrency model, which is a significant improvement and a much-needed
> addition. My feedback on ordering is aimed at maximizing its potential.
>
> I'd like to elaborate on a few specific scenarios and expectations that
> inform my perspective on the ordering:
>
>    1.
>
>    *Temptation for Race Semantics:* The beauty of mapConcurrent() integrating
>    with the Stream API means developers will naturally be drawn to use it for
>    race-like scenarios. Operations like findFirst() or limit(N) to get
>    the first few completed results are very intuitive combinations. For
>    example:
>    Java
>
>    // Hypothetical use case: find the fastest responding service
>    Optional<Result> fastestResult = serviceUrls.stream()
>        .gather(Gatherers.mapConcurrent(MAX_CONCURRENCY, url -> fetch(url)))
>        .findFirst();
>
>    2.
>
>    *Surprise for Race Semantics Users:* Following from the above, it
>    could be surprising for developers when they realize that the inherent
>    input ordering of mapConcurrent() means it's not optimized for these
>    race scenarios. The expectation would be that findFirst() returns as
>    soon as *any* task completes, but ordering can delay this if an
>    earlier task (in input order) is slower.
>    3.
>
>    *Ordering Assumption in Concurrency:* My experience is that ordering
>    is not typically a default assumption when dealing with operations
>    explicitly marked as "parallel" or "concurrent." For instance,
>    Stream.forEach() on a parallel stream does not guarantee encounter
>    order, presumably for performance reasons – a similar trade-off to what's
>    being discussed for mapConcurrent(). Developers often consult
>    documentation for ordering guarantees in concurrent contexts rather than
>    assuming them.
>    4.
>
>    *Expectation of "True" Concurrency:* When I see an API like 
> mapConcurrent(maxConcurrency,
>    mapper), my mental model is that if maxConcurrency permits, new tasks
>    should be initiated as soon as a slot is free. For example, with
>    maxConcurrency=2:
>    - Task 1 starts.
>       - Task 2 starts.
>       - If Task 2 finishes while Task 1 is still running, I would expect
>       Task 3 to run concurrently alongside task 1, because the max 
> concurrency is
>       2, not 1. The current ordered behavior, where Task 3 might have to wait 
> for
>       Task 1 to complete before its result can be processed (even if Task 3
>       itself could have started and finished), can feel a bit 
> counterintuitive to
>       the notion of maximizing concurrency up to the specified limit. It 
> almost
>       feels like not a "max concurrency", but "max buffer size".
>
> These points are offered to highlight potential areas where the current
> default could lead to subtle surprises or suboptimal performance for useful
> concurrent patterns.
>
> Thanks again for the open discussion and for your work on these valuable
> additions to the JDK.
>
> Best regards,
>
> On Tue, Jun 3, 2025 at 2:13 AM Viktor Klang <viktor.kl...@oracle.com>
> wrote:
>
> The general feedback received thus far has been primarily positive. There
> have been a few behavior-related enhancements over the previews to better
> handle interruption (there's still room to improve there, as per our
> concurrent conversation) as well as some improvements to work-in-progress
> tracking.
>
> It will be interesting to see which Gatherer-based operations will be
> devised by Java developers in the future.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yuj...@gmail.com>
> *Sent:* Monday, 2 June 2025 18:54
> *To:* Viktor Klang <viktor.kl...@oracle.com>
> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Should mapConcurrent() respect time order
> instead of input order?
>
>
> Hi Viktor,
>
> Thanks for your reply and for sharing your experience regarding user
> preferences. I appreciate that perspective.
>
> You're right, if an unordered version of mapConcurrent proves to be
> widely beneficial and is implemented and adopted by the community, it could
> certainly make a strong case for future inclusion in the JDK.
>
> I wanted to clarify a nuance regarding user preference that I might not
> have articulated clearly before. If the question is simply "ordered or
> unordered?", in isolation, I can see why many, myself included, might lean
> towards "ordered" as a general preference.
>
> However, the decision becomes more complex when the associated trade-offs
> are considered. If the question were phrased more like, "Do you prefer an
> ordered mapConcurrent by default, even if it entails potential
> performance overhead and limitations for certain use cases like race() 
> operations,
> versus an unordered version that offers higher throughput and broader
> applicability in such scenarios?" my (and perhaps others') answer might
> differ. The perceived cost versus benefit of ordering changes significantly
> when these factors are explicit.
>
> My initial suggestion stemmed from the belief that the performance and
> flexibility gains of an unordered approach for I/O-bound tasks would, in
> many practical situations, outweigh the convenience of default ordering,
> especially since ordering can be reintroduced relatively easily, and
> explicitly, when needed.
>
> Thanks again for the discussion.
>
> Best regards,
>
> On Mon, Jun 2, 2025 at 8:51 AM Viktor Klang <viktor.kl...@oracle.com>
> wrote:
>
> >My perspective is that strict adherence to input order for
> mapConcurrent() might not be the most common or beneficial default
> behavior for users.
>
> If there is indeed a *majority* who would benefit from an unordered
> version of mapConcurrent (my experience is that the majority prefer
> ordered) then, since it is possible to implement such a Gatherer outside of
> the JDK, this is something which will be constructed, widely used, and
> someone will then propose to add something similar to the JDK.
>
> >While re-implementing the gatherer is a possibility, the existing
> implementation is non-trivial, and creating a custom, robust alternative
> represents a significant undertaking.
>
> The existing version needs to maintain order, which adds to the complexity
> of the implementation. Implementing an unordered version would likely look
> different.
> I'd definitely encourage taking the opportunity to attempt to implement it.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
>
> ------------------------------
> *From:* Jige Yu <yuj...@gmail.com>
> *Sent:* Monday, 2 June 2025 17:05
> *To:* Viktor Klang <viktor.kl...@oracle.com>
> *Cc:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
> *Subject:* Re: [External] : Re: Should mapConcurrent() respect time order
> instead of input order?
>
>
> Thank you for your response and for considering my feedback on the
> mapConcurrent() gatherer. I understand and respect that the final
> decision rests with the JDK maintainers.
>
> I would like to offer a couple of further points for consideration. My
> perspective is that strict adherence to input order for mapConcurrent() might
> not be the most common or beneficial default behavior for users. I'd be
> very interested to see any research or data that suggests otherwise, as
> that would certainly inform my understanding.
>
> From my experience, a more common need is for higher throughput in
> I/O-intensive operations. The ability to support use cases like race()—where
> the first successfully completed operation determines the outcome—also
> seems like a valuable capability that is currently infeasible due to the
> ordering constraint.
>
> As I see it, if a developer specifically requires the input order to be
> preserved, this can be achieved with relative ease by applying a subsequent
> sorting operation. For instance:
>
> .gather(mapConcurrent(...))
> .sorted(Comparator.comparing(Result::getInputSequenceId))
>
> The primary challenge in these scenarios is typically the efficient
> fan-out and execution of concurrent tasks, not the subsequent sorting of
> results.
>
> Conversely, as you've noted, there isn't a straightforward way to modify
> the current default ordered behavior to achieve the higher throughput or
> race() semantics that an unordered approach would naturally provide.
>
> While re-implementing the gatherer is a possibility, the existing
> implementation is non-trivial, and creating a custom, robust alternative
> represents a significant undertaking. My hope was that an unordered option
> could be a valuable addition to the standard library, benefiting a wider
> range of developers.
>
> Thank you again for your time and consideration.
>
>
> On Mon, Jun 2, 2025 at 7:48 AM Viktor Klang <viktor.kl...@oracle.com>
> wrote:
>
> >Even if it by default preserves input order, when I explicitly called
> stream.unordered(), could mapConcurrent() respect that and in return
> achieve higher throughput with support for race?
>
> The Gatherer doesn't know whether the Stream is unordered or ordered. The
> operation should be semantically equivalent anyway.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* Jige Yu <yuj...@gmail.com>
> *Sent:* Monday, 2 June 2025 16:29
> *To:* Viktor Klang <viktor.kl...@oracle.com>; core-libs-dev@openjdk.org <
> core-libs-dev@openjdk.org>
> *Subject:* [External] : Re: Should mapConcurrent() respect time order
> instead of input order?
>
> Sorry. Forgot to copy to the mailing list.
>
> On Mon, Jun 2, 2025 at 7:27 AM Jige Yu <yuj...@gmail.com> wrote:
>
> Thanks Viktor!
>
> I was thinking from my own experience that I wouldn't have automatically
> assumed that a concurrent fanout library would by default preserve input
> order.
>
> And I think wanting high throughput with real-life utilities like race
> would be more commonly useful.
>
> But I could be wrong.
>
> Regardless, mapConcurrent() can do both, no?
>
> Even if it by default preserves input order, when I explicitly called
> stream.unordered(), could mapConcurrent() respect that and in return
> achieve higher throughput with support for race?
>
>
>
> On Mon, Jun 2, 2025 at 2:33 AM Viktor Klang <viktor.kl...@oracle.com>
> wrote:
>
> Hi!
>
> In a similar vein to the built-in Collectors,
> the built-in Gatherers provide solutions to common stream-related
> problems, but also, they also serve as "inspiration" for developers for
> what is possible to implement using Gatherers.
>
> If someone, for performance reasons, and with a use-case which does not
> require encounter-order, want to take advantage of that combination of
> circumstances, it is definitely possible to implement your own Gatherer
> which has that behavior.
>
> Cheers,
> √
>
>
> *Viktor Klang*
> Software Architect, Java Platform Group
> Oracle
> ------------------------------
> *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of Jige
> Yu <yuj...@gmail.com>
> *Sent:* Sunday, 1 June 2025 21:08
> *To:* core-libs-dev@openjdk.org <core-libs-dev@openjdk.org>
> *Subject:* Should mapConcurrent() respect time order instead of input
> order?
>
> It seems like for most people, input order isn't that important for
> concurrent work, and concurrent results being in non-deterministic order is
> often expected.
>
> If mapConcurrent() just respect output encounter order:
>
> It'll be able to achieve *higher throughput* if an early task is slow,
> For example, with concurrency=2, and if the first task takes 10 minutes to
> run, mapConcurrent() would only be able to process 2 tasks within the first
> 10 minutes; whereas with encounter order, the first task being slow doesn't
> block the 3rd - 100th elements from being processed and output.
>
> mapConcurrent() can be used to implement useful concurrent semantics, for
> example to *support race* semantics. Imagine if I need to send request to
> 10 candidate backends and take whichever that succeeds first, I'd be able
> to do:
>
> backends.stream()
>     .gather(mapConcurrent(
>         backend -> {
>           try {
>             return backend.fetchOrder();
>            } catch (RpcException e) {
>              return null; // failed to fetch but not fatal
>            }
>         })
>         .filter(Objects::notNull)
>         .findFirst(); // first success then cancel the rest
>
> Cheers,
>
>

Reply via email to