Then let me correct myself again, while simplifying -- I just want that detail, that certain combinations might lead to pre-fetching everything, to be documented on the stream api. Package level, or on the Stream interface itself, seems like a good spot.
On Thu, Nov 14, 2024, 4:22 AM Viktor Klang <viktor.kl...@oracle.com> wrote: > The issue here is that the operation cannot advertise this, as it depends > on the combination of operations. > > > Cheers, > √ > > > *Viktor Klang* > Software Architect, Java Platform Group > Oracle > ------------------------------ > *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of David > Alayachew <davidalayac...@gmail.com> > *Sent:* Wednesday, 13 November 2024 14:07 > *To:* Rob Spoor <open...@icemanx.nl> > *Cc:* core-libs-dev <core-libs-dev@openjdk.org> > *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and > fetching too many elements > > That is a good point Rob. > > Then let me correct myself -- I think the terminal operations don't do a > great job of advertising whether or not they pre-fetch everything when > parallelism is activated. Collector fetches as needed. FindAny pre-fetches > everything. I understand that later releases might change their behaviour, > but I still want to document the current behaviour in the official javadocs > so that we can limit undocumented tripping hazards. > > > On Wed, Nov 13, 2024, 7:07 AM Rob Spoor <open...@icemanx.nl> wrote: > > distinct() doesn't require everything to be pulled. It can push elements > to the downstream as they come along for the first time. When > downstream.push returns false the gatherer is done. > > As part of some experimentation I've implemented all intermediary > operations using gatherers. Most of them are pretty straightforward and > will stop integrating once the downstream starts rejecting elements > (although some use Gatherer.ofSequential to keep it easy). I only found > two exceptions that don't return the result of downstream.push: > > * mapMulti. The downstream.push is passed as the mapper which is a > Consumer - the return value is ignored. With some more effort it's > probably possible to capture any false return value and return that from > the integrator, but I haven't tried that yet. > > * sorted. Obviously every element needs to be inspected. > > > On 13/11/2024 00:37, David Alayachew wrote: > > Oh sure, I expect something like distinct() to pull everything. In order > to > > know if something is distinct, you have to do some variant of "check > > against everyone else". Whether that is holding all instances in memory > or > > their hashes, it's clear from a glance that you will need to look at > > everything, and therefore, pre-fetching makes intuitive sense to me. > > > > I 100% did not expect terminal operations like findAny() or reduce() to > > pull the whole data set. That was a complete whiplash for me. The method > > findAny() advertises itself as a short-circuiting operation, so to find > out > > that it actually pulls the whole data set anyways was shocking. > > > > And that was my biggest pain point -- looking at the documentation, it is > > not clear to me at all that methods like findAny() would pull in all data > > upon becoming parallel(). > > > > Do you think it would make sense to add documentation about this to the > > javadocs for Stream/java.util.stream? Or maybe it is already there and I > > misunderstood it (even after reading through it thoroughly over 5 times). > > > > > > On Tue, Nov 12, 2024, 10:06 AM Viktor Klang <viktor.kl...@oracle.com> > wrote: > > > >>> We are told how Streams can process unbounded data sets, but when it > >> tries to do a findAny() with parallel(), it runs into an OOME because it > >> fetched all the data ahead of time. In fact, almost of the terminal > >> operations will hit an OOME in the exact same way if they are parallel > and > >> have a big enough data set. It's definitely not the end of the world, > but > >> it seems that I have to fit everything into a Collector and/or a > Gatherer > >> if I want to avoid pre-fetching everything. > >> > >> Yeah, I think it is important to distinguish "can process unbounded data > >> sets" from "always able to process unbounded data sets". > >> > >> Some operations inherently need the end of the stream, so even something > >> somple like: stream.distinct() or stream.sorted() can end up pulling in > all > >> data (which of course won't terminate). > >> > >> Fortunately, I think Gatherers can unlock much more situations where > >> unbounded streams can be processed. > >> > >> Cheers, > >> √ > >> > >> > >> *Viktor Klang* > >> Software Architect, Java Platform Group > >> Oracle > >> ------------------------------ > >> *From:* David Alayachew <davidalayac...@gmail.com> > >> *Sent:* Tuesday, 12 November 2024 15:08 > >> *To:* Viktor Klang <viktor.kl...@oracle.com> > >> *Cc:* core-libs-dev <core-libs-dev@openjdk.org> > >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and > >> fetching too many elements > >> > >> > >> Oh woah. I certainly did not. Or rather, I had dismissed the idea as > soon > >> as I thought of it. > >> > >> > >> I hand-waved away the idea because I thought that the method would turn > >> the stream pipeline parallel, thus, recreating the same problem I > currently > >> have of parallelism causing all of the elements to be fetched ahead of > >> time, causing an OOME. > >> > >> > >> It did NOT occur to me that the pipeline would stay sequential, and just > >> kick these off sequentially, but have them executing in parallel. I > can't > >> see why I came to that incorrect conclusion. I have read the javadocs of > >> this method several times. Though, to be fair, I came to the same, > >> incorrect conclusion about Collectors.groupingByConcurrent(), and it > wasn't > >> until someone pointed out what the documentation was actually saying > that I > >> realized it's true properties. > >> > >> Thanks. That definitely solves at least part of my problem. Obviously, I > >> would prefer to write to S3 in parallel too, but at the very least, the > >> calculation part is being done in parallel. And worst case scenario, I > can > >> be really bad and just do the write to S3 in the mapConcurrent, and then > >> just return the metadata of each write, and just bundle that up with > >> collect. > >> > >> > >> And that's ignoring the fact that I can just use the workaround too. > >> > >> > >> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to me > >> from a performance perspective, but is rather unintuitive to me from a > >> usability perspective. We are told how Streams can process unbounded > data > >> sets, but when it tries to do a findAny() with parallel(), it runs into > an > >> OOME because it fetched all the data ahead of time. In fact, almost of > the > >> terminal operations will hit an OOME in the exact same way if they are > >> parallel and have a big enough data set. It's definitely not the end of > the > >> world, but it seems that I have to fit everything into a Collector > and/or a > >> Gatherer if I want to avoid pre-fetching everything. > >> > >> On Tue, Nov 12, 2024, 6:36 AM Viktor Klang <viktor.kl...@oracle.com> > >> wrote: > >> > >> Have you considered Gatherers.mapConcurrent(…)? > >> > >> > >> Cheers, > >> √ > >> > >> > >> *Viktor Klang* > >> Software Architect, Java Platform Group > >> Oracle > >> ------------------------------ > >> *From:* David Alayachew <davidalayac...@gmail.com> > >> *Sent:* Tuesday, 12 November 2024 01:53 > >> *To:* Viktor Klang <viktor.kl...@oracle.com> > >> *Cc:* core-libs-dev <core-libs-dev@openjdk.org> > >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and > >> fetching too many elements > >> > >> Good to know, ty vm. > >> > >> At the very least, I have this workaround. This will meet my needs for > now. > >> > >> I guess my final question would be -- is this type of problem better > >> suited to something besides parallel streams? Maybe an ExecutorService? > >> > >> Really, all I am doing is taking a jumbo file, splitting it into > batches, > >> and then doing some work on those batches. My IO speeds are pretty fast, > >> and the compute work is non-trivial, so there is performance being left > on > >> the table if I give up parallelism. And I am in a position where > completion > >> time is very important to us. > >> > >> I just naturally assumed parallel streams were the right choice because > >> the compute work is simple. A pure function that I can break out, and > then > >> call in a map. Once I do that, I just call forEach to write the batches > >> back out to S3. Maybe I should look into a different part of the std lib > >> instead because I am using the wrong tool for the job? My nose says > >> ExecutorService, but I figure I should ask before I dive too deep in. > >> > >> > >> On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <viktor.kl...@oracle.com> > >> wrote: > >> > >> You're most welcome! > >> > >> In a potential future where all intermediate operations are > >> Gatherer-based, and all terminal operations are Collector-based, it > would > >> just work as expected. But with that said, I'm not sure it is > practically > >> achievable because some operations might not have the same > >> performance-characteristics as before. > >> > >> Cheers, > >> √ > >> > >> > >> *Viktor Klang* > >> Software Architect, Java Platform Group > >> Oracle > >> ------------------------------ > >> *From:* David Alayachew <davidalayac...@gmail.com> > >> *Sent:* Monday, 11 November 2024 18:32 > >> *To:* Viktor Klang <viktor.kl...@oracle.com> > >> *Cc:* core-libs-dev <core-libs-dev@openjdk.org> > >> *Subject:* [External] : Re: Question about Streams, Gatherers, and > >> fetching too many elements > >> > >> > >> Thanks for the workaround. It's running beautifully. > >> > >> Is there a future where this island concept is extended to the rest of > >> streams? Tbh, I don't fully understand it. > >> > >> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <viktor.kl...@oracle.com> > >> wrote: > >> > >> Hi David, > >> > >> This is the effect of how parallel streams are implemented, where > >> different stages, which are not representible as a join-less Spliterator > >> are executed as a series of "islands" where the next isn't started until > >> the former has completed. > >> > >> If you think about it, parallelization of a Stream works best when the > >> entire data set can be split amongst a set of worker threads, and that > sort > >> of implies that you want eager pre-fetch of data, so if your dataset > does > >> not fit in memory, that is likely to lead to less desirable outcomes. > >> > >> What I was able to do for Gatherers is to implement "gather(…) + > >> collect(…)"-fusion so any number of consecutive gather(…)-operations > >> immediately followed by a collect(…) is run in the same "island". > >> > >> So with that said, you could try something like the following: > >> > >> static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T> > *each*) { > >> *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e), > (*l*, > >> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH); > >> } > >> > >> > >> stream > >> .parallel() > >> .unordered() > >> .gather(Gatherers.windowFixed(BATCH_SIZE)) > >> .collect(forEach(eachList -> println(eachList.getFirst()))); > >> > >> > >> Cheers, > >> √ > >> > >> > >> *Viktor Klang* > >> Software Architect, Java Platform Group > >> Oracle > >> ------------------------------ > >> *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of > David > >> Alayachew <davidalayac...@gmail.com> > >> *Sent:* Monday, 11 November 2024 14:52 > >> *To:* core-libs-dev <core-libs-dev@openjdk.org> > >> *Subject:* Re: Question about Streams, Gatherers, and fetching too many > >> elements > >> > >> And just to avoid the obvious question, I can hold about 30 batches in > >> memory before the Out of Memory error occurs. So this is not an issue > of my > >> batch size being too high. > >> > >> But just to confirm, I set the batch size to 1, and it still ran into an > >> out of memory error. So I feel fairly confident saying that the > Gatherer is > >> trying to grab all available data before sending any of it downstream. > >> > >> On Mon, Nov 11, 2024, 8:46 AM David Alayachew <davidalayac...@gmail.com > > > >> wrote: > >> > >> Hello Core Libs Dev Team, > >> > >> I was trying out Gatherers for a project at work, and ran into a rather > >> sad scenario. > >> > >> I need to process a large file in batches. Each batch is small enough > that > >> I can hold it in memory, but I cannot hold the entire file (and thus, > all > >> of the batches) in memory at once. > >> > >> Looking at the Gatherers API, I saw windowFixed and thought that it > would > >> be a great match for my use case. > >> > >> However, when trying it out, I was disappointed to see that it ran out > of > >> memory very quickly. Here is my attempt at using it. > >> > >> stream > >> .parallel() > >> .unordered() > >> .gather(Gatherers.windowFixed(BATCH_SIZE)) > >> .forEach(eachList -> println(eachList.getFirst())) > >> ; > >> > >> As you can see, I am just splitting the file into batches, and printing > >> out the first of each batch. This is purely for example's sake, of > course. > >> I had planned on building even more functionality on top of this, but I > >> couldn't even get past this example. > >> > >> But anyways, not even a single one of them printed out. Which leads me > to > >> believe that it's pulling all of them in the Gatherer. > >> > >> I can get it to run successfully if I go sequentially, but not parallel. > >> Parallel gives me that out of memory error. > >> > >> Is there any way for me to be able to have the Gatherer NOT pull in > >> everything while still remaining parallel and unordered? > >> > >> Thank you for your time and help. > >> David Alayachew > >