I think the problem is that it depends on the order, and combination, of 
operations to know what executes in the same "island".

My personal preference would try to end up in a place where an entire pipeline 
is executed as a single island, which would mean that short-circuit signals 
would always propagate right back to the source.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: David Alayachew <davidalayac...@gmail.com>
Sent: Wednesday, 13 November 2024 00:37
To: Viktor Klang <viktor.kl...@oracle.com>
Cc: core-libs-dev <core-libs-dev@openjdk.org>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Oh sure, I expect something like distinct() to pull everything. In order to 
know if something is distinct, you have to do some variant of "check against 
everyone else". Whether that is holding all instances in memory or their 
hashes, it's clear from a glance that you will need to look at everything, and 
therefore, pre-fetching makes intuitive sense to me.

I 100% did not expect terminal operations like findAny() or reduce() to pull 
the whole data set. That was a complete whiplash for me. The method findAny() 
advertises itself as a short-circuiting operation, so to find out that it 
actually pulls the whole data set anyways was shocking.

And that was my biggest pain point -- looking at the documentation, it is not 
clear to me at all that methods like findAny() would pull in all data upon 
becoming parallel().

Do you think it would make sense to add documentation about this to the 
javadocs for Stream/java.util.stream? Or maybe it is already there and I 
misunderstood it (even after reading through it thoroughly over 5 times).


On Tue, Nov 12, 2024, 10:06 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
>We are told how Streams can process unbounded data sets, but when it tries to 
>do a findAny() with parallel(), it runs into an OOME because it fetched all 
>the data ahead of time. In fact, almost of the terminal operations will hit an 
>OOME in the exact same way if they are parallel and have a big enough data 
>set. It's definitely not the end of the world, but it seems that I have to fit 
>everything into a Collector and/or a Gatherer if I want to avoid pre-fetching 
>everything.

Yeah, I think it is important to distinguish "can process unbounded data sets" 
from "always able to process unbounded data sets".

Some operations inherently need the end of the stream, so even something somple 
like: stream.distinct() or stream.sorted() can end up pulling in all data 
(which of course won't terminate).

Fortunately, I think Gatherers can unlock much more situations where unbounded 
streams can be processed.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: David Alayachew 
<davidalayac...@gmail.com<mailto:davidalayac...@gmail.com>>
Sent: Tuesday, 12 November 2024 15:08
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev <core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements


Oh woah. I certainly did not. Or rather, I had dismissed the idea as soon as I 
thought of it.


I hand-waved away the idea because I thought that the method would turn the 
stream pipeline parallel, thus, recreating the same problem I currently have of 
parallelism causing all of the elements to be fetched ahead of time, causing an 
OOME.


It did NOT occur to me that the pipeline would stay sequential, and just kick 
these off sequentially, but have them executing in parallel. I can't see why I 
came to that incorrect conclusion. I have read the javadocs of this method 
several times. Though, to be fair, I came to the same, incorrect conclusion 
about Collectors.groupingByConcurrent(), and it wasn't until someone pointed 
out what the documentation was actually saying that I realized it's true 
properties.

Thanks. That definitely solves at least part of my problem. Obviously, I would 
prefer to write to S3 in parallel too, but at the very least, the calculation 
part is being done in parallel. And worst case scenario, I can be really bad 
and just do the write to S3 in the mapConcurrent, and then just return the 
metadata of each write, and just bundle that up with collect.


And that's ignoring the fact that I can just use the workaround too.


Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me from a 
performance perspective, but is rather unintuitive to me from a usability 
perspective. We are told how Streams can process unbounded data sets, but when 
it tries to do a findAny() with parallel(), it runs into an OOME because it 
fetched all the data ahead of time. In fact, almost of the terminal operations 
will hit an OOME in the exact same way if they are parallel and have a big 
enough data set. It's definitely not the end of the world, but it seems that I 
have to fit everything into a Collector and/or a Gatherer if I want to avoid 
pre-fetching everything.

On Tue, Nov 12, 2024, 6:36 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
Have you considered Gatherers.mapConcurrent(…)?


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: David Alayachew 
<davidalayac...@gmail.com<mailto:davidalayac...@gmail.com>>
Sent: Tuesday, 12 November 2024 01:53
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev <core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: [External] : Re: Question about Streams, Gatherers, and fetching 
too many elements

Good to know, ty vm.

At the very least, I have this workaround. This will meet my needs for now.

I guess my final question would be -- is this type of problem better suited to 
something besides parallel streams? Maybe an ExecutorService?

Really, all I am doing is taking a jumbo file, splitting it into batches, and 
then doing some work on those batches. My IO speeds are pretty fast, and the 
compute work is non-trivial, so there is performance being left on the table if 
I give up parallelism. And I am in a position where completion time is very 
important to us.

I just naturally assumed parallel streams were the right choice because the 
compute work is simple. A pure function that I can break out, and then call in 
a map. Once I do that, I just call forEach to write the batches back out to S3. 
Maybe I should look into a different part of the std lib instead because I am 
using the wrong tool for the job? My nose says ExecutorService, but I figure I 
should ask before I dive too deep in.


On Mon, Nov 11, 2024, 2:34 PM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
You're most welcome!

In a potential future where all intermediate operations are Gatherer-based, and 
all terminal operations are Collector-based, it would just work as expected. 
But with that said, I'm not sure it is practically achievable because some 
operations might not have the same performance-characteristics as before.

Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: David Alayachew 
<davidalayac...@gmail.com<mailto:davidalayac...@gmail.com>>
Sent: Monday, 11 November 2024 18:32
To: Viktor Klang <viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>>
Cc: core-libs-dev <core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: [External] : Re: Question about Streams, Gatherers, and fetching too 
many elements


Thanks for the workaround. It's running beautifully.

Is there a future where this island concept is extended to the rest of streams? 
Tbh, I don't fully understand it.

On Mon, Nov 11, 2024, 9:59 AM Viktor Klang 
<viktor.kl...@oracle.com<mailto:viktor.kl...@oracle.com>> wrote:
Hi David,

This is the effect of how parallel streams are implemented, where different 
stages, which are not representible as a join-less Spliterator are executed as 
a series of "islands" where the next isn't started until the former has 
completed.

If you think about it, parallelization of a Stream works best when the entire 
data set can be split amongst a set of worker threads, and that sort of implies 
that you want eager pre-fetch of data, so if your dataset does not fit in 
memory, that is likely to lead to less desirable outcomes.

What I was able to do for Gatherers is to implement "gather(…) + 
collect(…)"-fusion so any number of consecutive gather(…)-operations 
immediately followed by a collect(…) is run in the same "island".

So with that said, you could try something like the following:

static <T> Collector<T, ?, Void> forEach(Consumer<? super T> each) {
    return Collector.of(() -> null, (v, e) -> each.accept(e), (l, r) -> l, (v) 
-> null, Collector.Characteristics.IDENTITY_FINISH);
}


stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.collect(forEach(eachList -> println(eachList.getFirst())));


Cheers,
√


Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: core-libs-dev 
<core-libs-dev-r...@openjdk.org<mailto:core-libs-dev-r...@openjdk.org>> on 
behalf of David Alayachew 
<davidalayac...@gmail.com<mailto:davidalayac...@gmail.com>>
Sent: Monday, 11 November 2024 14:52
To: core-libs-dev <core-libs-dev@openjdk.org<mailto:core-libs-dev@openjdk.org>>
Subject: Re: Question about Streams, Gatherers, and fetching too many elements

And just to avoid the obvious question, I can hold about 30 batches in memory 
before the Out of Memory error occurs. So this is not an issue of my batch size 
being too high.

But just to confirm, I set the batch size to 1, and it still ran into an out of 
memory error. So I feel fairly confident saying that the Gatherer is trying to 
grab all available data before sending any of it downstream.

On Mon, Nov 11, 2024, 8:46 AM David Alayachew 
<davidalayac...@gmail.com<mailto:davidalayac...@gmail.com>> wrote:
Hello Core Libs Dev Team,

I was trying out Gatherers for a project at work, and ran into a rather sad 
scenario.

I need to process a large file in batches. Each batch is small enough that I 
can hold it in memory, but I cannot hold the entire file (and thus, all of the 
batches) in memory at once.

Looking at the Gatherers API, I saw windowFixed and thought that it would be a 
great match for my use case.

However, when trying it out, I was disappointed to see that it ran out of 
memory very quickly. Here is my attempt at using it.

stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.forEach(eachList -> println(eachList.getFirst()))
;

As you can see, I am just splitting the file into batches, and printing out the 
first of each batch. This is purely for example's sake, of course. I had 
planned on building even more functionality on top of this, but I couldn't even 
get past this example.

But anyways, not even a single one of them printed out. Which leads me to 
believe that it's pulling all of them in the Gatherer.

I can get it to run successfully if I go sequentially, but not parallel. 
Parallel gives me that out of memory error.

Is there any way for me to be able to have the Gatherer NOT pull in everything 
while still remaining parallel and unordered?

Thank you for your time and help.
David Alayachew

Reply via email to