Thanks for the workaround. It's running beautifully. Is there a future where this island concept is extended to the rest of streams? Tbh, I don't fully understand it.
On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <viktor.kl...@oracle.com> wrote: > Hi David, > > This is the effect of how parallel streams are implemented, where > different stages, which are not representible as a join-less Spliterator > are executed as a series of "islands" where the next isn't started until > the former has completed. > > If you think about it, parallelization of a Stream works best when the > entire data set can be split amongst a set of worker threads, and that sort > of implies that you want eager pre-fetch of data, so if your dataset does > not fit in memory, that is likely to lead to less desirable outcomes. > > What I was able to do for Gatherers is to implement "gather(…) + > collect(…)"-fusion so any number of consecutive gather(…)-operations > immediately followed by a collect(…) is run in the same "island". > > So with that said, you could try something like the following: > > static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T> *each*) { > *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e), (*l*, > *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH); > } > > > stream > .parallel() > .unordered() > .gather(Gatherers.windowFixed(BATCH_SIZE)) > .collect(forEach(eachList -> println(eachList.getFirst()))); > > > Cheers, > √ > > > *Viktor Klang* > Software Architect, Java Platform Group > Oracle > ------------------------------ > *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of David > Alayachew <davidalayac...@gmail.com> > *Sent:* Monday, 11 November 2024 14:52 > *To:* core-libs-dev <core-libs-dev@openjdk.org> > *Subject:* Re: Question about Streams, Gatherers, and fetching too many > elements > > And just to avoid the obvious question, I can hold about 30 batches in > memory before the Out of Memory error occurs. So this is not an issue of my > batch size being too high. > > But just to confirm, I set the batch size to 1, and it still ran into an > out of memory error. So I feel fairly confident saying that the Gatherer is > trying to grab all available data before sending any of it downstream. > > On Mon, Nov 11, 2024, 8:46 AM David Alayachew <davidalayac...@gmail.com> > wrote: > > Hello Core Libs Dev Team, > > I was trying out Gatherers for a project at work, and ran into a rather > sad scenario. > > I need to process a large file in batches. Each batch is small enough that > I can hold it in memory, but I cannot hold the entire file (and thus, all > of the batches) in memory at once. > > Looking at the Gatherers API, I saw windowFixed and thought that it would > be a great match for my use case. > > However, when trying it out, I was disappointed to see that it ran out of > memory very quickly. Here is my attempt at using it. > > stream > .parallel() > .unordered() > .gather(Gatherers.windowFixed(BATCH_SIZE)) > .forEach(eachList -> println(eachList.getFirst())) > ; > > As you can see, I am just splitting the file into batches, and printing > out the first of each batch. This is purely for example's sake, of course. > I had planned on building even more functionality on top of this, but I > couldn't even get past this example. > > But anyways, not even a single one of them printed out. Which leads me to > believe that it's pulling all of them in the Gatherer. > > I can get it to run successfully if I go sequentially, but not parallel. > Parallel gives me that out of memory error. > > Is there any way for me to be able to have the Gatherer NOT pull in > everything while still remaining parallel and unordered? > > Thank you for your time and help. > David Alayachew > >