Poking the thread in case you are able to answer my previous question Viktor.
Also, another question -- I posted this thread to Reddit, and some good discussion has already started. Can you or someone else answer some of the questions that have popped up there? https://old.reddit.com/r/java/comments/1gukzhb/a_surprising_pain_point_regarding_parallel_java/ Ty vm! On Thu, Nov 14, 2024 at 5:45 PM David Alayachew <davidalayac...@gmail.com> wrote: > Oh ok. So it truly is a toss-up depending on each implementation when and > where this occurs. > > Ok, then as my final request, I think even informing the user that this > CAN occur is worth doing. If nothing else, the user scouring the > documentation for documentation of this behaviour will know that it is > simply something that can occur. They don't need to know all the details. > Simply give it a official term, describe the behaviour, tell that it is > implementation specific on when this happens, but that it is only possible > during parallelism. Even just knowing the verbiage to describe the problem > will enable them to better communicate with each other on what they want vs > what they get. That helps searchability, if nothing else. > > On Thu, Nov 14, 2024, 8:45 AM Viktor Klang <viktor.kl...@oracle.com> > wrote: > >> I see what you're saying, the problem is that it depends on the Stream >> implementation (given that Stream is an interface). >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* David Alayachew <davidalayac...@gmail.com> >> *Sent:* Thursday, 14 November 2024 12:36 >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> *Cc:* Rob Spoor <open...@icemanx.nl>; core-libs-dev < >> core-libs-dev@openjdk.org> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and >> fetching too many elements >> >> >> Then let me correct myself again, while simplifying -- I just want that >> detail, that certain combinations might lead to pre-fetching everything, to >> be documented on the stream api. Package level, or on the Stream interface >> itself, seems like a good spot. >> >> On Thu, Nov 14, 2024, 4:22 AM Viktor Klang <viktor.kl...@oracle.com> >> wrote: >> >> The issue here is that the operation cannot advertise this, as it depends >> on the combination of operations. >> >> >> Cheers, >> √ >> >> >> *Viktor Klang* >> Software Architect, Java Platform Group >> Oracle >> ------------------------------ >> *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of >> David Alayachew <davidalayac...@gmail.com> >> *Sent:* Wednesday, 13 November 2024 14:07 >> *To:* Rob Spoor <open...@icemanx.nl> >> *Cc:* core-libs-dev <core-libs-dev@openjdk.org> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and >> fetching too many elements >> >> That is a good point Rob. >> >> Then let me correct myself -- I think the terminal operations don't do a >> great job of advertising whether or not they pre-fetch everything when >> parallelism is activated. Collector fetches as needed. FindAny pre-fetches >> everything. I understand that later releases might change their behaviour, >> but I still want to document the current behaviour in the official javadocs >> so that we can limit undocumented tripping hazards. >> >> >> On Wed, Nov 13, 2024, 7:07 AM Rob Spoor <open...@icemanx.nl> wrote: >> >> distinct() doesn't require everything to be pulled. It can push elements >> to the downstream as they come along for the first time. When >> downstream.push returns false the gatherer is done. >> >> As part of some experimentation I've implemented all intermediary >> operations using gatherers. Most of them are pretty straightforward and >> will stop integrating once the downstream starts rejecting elements >> (although some use Gatherer.ofSequential to keep it easy). I only found >> two exceptions that don't return the result of downstream.push: >> >> * mapMulti. The downstream.push is passed as the mapper which is a >> Consumer - the return value is ignored. With some more effort it's >> probably possible to capture any false return value and return that from >> the integrator, but I haven't tried that yet. >> >> * sorted. Obviously every element needs to be inspected. >> >> >> On 13/11/2024 00:37, David Alayachew wrote: >> > Oh sure, I expect something like distinct() to pull everything. In >> order to >> > know if something is distinct, you have to do some variant of "check >> > against everyone else". Whether that is holding all instances in memory >> or >> > their hashes, it's clear from a glance that you will need to look at >> > everything, and therefore, pre-fetching makes intuitive sense to me. >> > >> > I 100% did not expect terminal operations like findAny() or reduce() to >> > pull the whole data set. That was a complete whiplash for me. The method >> > findAny() advertises itself as a short-circuiting operation, so to find >> out >> > that it actually pulls the whole data set anyways was shocking. >> > >> > And that was my biggest pain point -- looking at the documentation, it >> is >> > not clear to me at all that methods like findAny() would pull in all >> data >> > upon becoming parallel(). >> > >> > Do you think it would make sense to add documentation about this to the >> > javadocs for Stream/java.util.stream? Or maybe it is already there and I >> > misunderstood it (even after reading through it thoroughly over 5 >> times). >> > >> > >> > On Tue, Nov 12, 2024, 10:06 AM Viktor Klang <viktor.kl...@oracle.com> >> wrote: >> > >> >>> We are told how Streams can process unbounded data sets, but when it >> >> tries to do a findAny() with parallel(), it runs into an OOME because >> it >> >> fetched all the data ahead of time. In fact, almost of the terminal >> >> operations will hit an OOME in the exact same way if they are parallel >> and >> >> have a big enough data set. It's definitely not the end of the world, >> but >> >> it seems that I have to fit everything into a Collector and/or a >> Gatherer >> >> if I want to avoid pre-fetching everything. >> >> >> >> Yeah, I think it is important to distinguish "can process unbounded >> data >> >> sets" from "always able to process unbounded data sets". >> >> >> >> Some operations inherently need the end of the stream, so even >> something >> >> somple like: stream.distinct() or stream.sorted() can end up pulling >> in all >> >> data (which of course won't terminate). >> >> >> >> Fortunately, I think Gatherers can unlock much more situations where >> >> unbounded streams can be processed. >> >> >> >> Cheers, >> >> √ >> >> >> >> >> >> *Viktor Klang* >> >> Software Architect, Java Platform Group >> >> Oracle >> >> ------------------------------ >> >> *From:* David Alayachew <davidalayac...@gmail.com> >> >> *Sent:* Tuesday, 12 November 2024 15:08 >> >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> >> *Cc:* core-libs-dev <core-libs-dev@openjdk.org> >> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and >> >> fetching too many elements >> >> >> >> >> >> Oh woah. I certainly did not. Or rather, I had dismissed the idea as >> soon >> >> as I thought of it. >> >> >> >> >> >> I hand-waved away the idea because I thought that the method would turn >> >> the stream pipeline parallel, thus, recreating the same problem I >> currently >> >> have of parallelism causing all of the elements to be fetched ahead of >> >> time, causing an OOME. >> >> >> >> >> >> It did NOT occur to me that the pipeline would stay sequential, and >> just >> >> kick these off sequentially, but have them executing in parallel. I >> can't >> >> see why I came to that incorrect conclusion. I have read the javadocs >> of >> >> this method several times. Though, to be fair, I came to the same, >> >> incorrect conclusion about Collectors.groupingByConcurrent(), and it >> wasn't >> >> until someone pointed out what the documentation was actually saying >> that I >> >> realized it's true properties. >> >> >> >> Thanks. That definitely solves at least part of my problem. Obviously, >> I >> >> would prefer to write to S3 in parallel too, but at the very least, the >> >> calculation part is being done in parallel. And worst case scenario, I >> can >> >> be really bad and just do the write to S3 in the mapConcurrent, and >> then >> >> just return the metadata of each write, and just bundle that up with >> >> collect. >> >> >> >> >> >> And that's ignoring the fact that I can just use the workaround too. >> >> >> >> >> >> Yeah, the whole "pre-fetch all the data ahead of time" makes sense to >> me >> >> from a performance perspective, but is rather unintuitive to me from a >> >> usability perspective. We are told how Streams can process unbounded >> data >> >> sets, but when it tries to do a findAny() with parallel(), it runs >> into an >> >> OOME because it fetched all the data ahead of time. In fact, almost of >> the >> >> terminal operations will hit an OOME in the exact same way if they are >> >> parallel and have a big enough data set. It's definitely not the end >> of the >> >> world, but it seems that I have to fit everything into a Collector >> and/or a >> >> Gatherer if I want to avoid pre-fetching everything. >> >> >> >> On Tue, Nov 12, 2024, 6:36 AM Viktor Klang <viktor.kl...@oracle.com> >> >> wrote: >> >> >> >> Have you considered Gatherers.mapConcurrent(…)? >> >> >> >> >> >> Cheers, >> >> √ >> >> >> >> >> >> *Viktor Klang* >> >> Software Architect, Java Platform Group >> >> Oracle >> >> ------------------------------ >> >> *From:* David Alayachew <davidalayac...@gmail.com> >> >> *Sent:* Tuesday, 12 November 2024 01:53 >> >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> >> *Cc:* core-libs-dev <core-libs-dev@openjdk.org> >> >> *Subject:* Re: [External] : Re: Question about Streams, Gatherers, and >> >> fetching too many elements >> >> >> >> Good to know, ty vm. >> >> >> >> At the very least, I have this workaround. This will meet my needs for >> now. >> >> >> >> I guess my final question would be -- is this type of problem better >> >> suited to something besides parallel streams? Maybe an ExecutorService? >> >> >> >> Really, all I am doing is taking a jumbo file, splitting it into >> batches, >> >> and then doing some work on those batches. My IO speeds are pretty >> fast, >> >> and the compute work is non-trivial, so there is performance being >> left on >> >> the table if I give up parallelism. And I am in a position where >> completion >> >> time is very important to us. >> >> >> >> I just naturally assumed parallel streams were the right choice because >> >> the compute work is simple. A pure function that I can break out, and >> then >> >> call in a map. Once I do that, I just call forEach to write the batches >> >> back out to S3. Maybe I should look into a different part of the std >> lib >> >> instead because I am using the wrong tool for the job? My nose says >> >> ExecutorService, but I figure I should ask before I dive too deep in. >> >> >> >> >> >> On Mon, Nov 11, 2024, 2:34 PM Viktor Klang <viktor.kl...@oracle.com> >> >> wrote: >> >> >> >> You're most welcome! >> >> >> >> In a potential future where all intermediate operations are >> >> Gatherer-based, and all terminal operations are Collector-based, it >> would >> >> just work as expected. But with that said, I'm not sure it is >> practically >> >> achievable because some operations might not have the same >> >> performance-characteristics as before. >> >> >> >> Cheers, >> >> √ >> >> >> >> >> >> *Viktor Klang* >> >> Software Architect, Java Platform Group >> >> Oracle >> >> ------------------------------ >> >> *From:* David Alayachew <davidalayac...@gmail.com> >> >> *Sent:* Monday, 11 November 2024 18:32 >> >> *To:* Viktor Klang <viktor.kl...@oracle.com> >> >> *Cc:* core-libs-dev <core-libs-dev@openjdk.org> >> >> *Subject:* [External] : Re: Question about Streams, Gatherers, and >> >> fetching too many elements >> >> >> >> >> >> Thanks for the workaround. It's running beautifully. >> >> >> >> Is there a future where this island concept is extended to the rest of >> >> streams? Tbh, I don't fully understand it. >> >> >> >> On Mon, Nov 11, 2024, 9:59 AM Viktor Klang <viktor.kl...@oracle.com> >> >> wrote: >> >> >> >> Hi David, >> >> >> >> This is the effect of how parallel streams are implemented, where >> >> different stages, which are not representible as a join-less >> Spliterator >> >> are executed as a series of "islands" where the next isn't started >> until >> >> the former has completed. >> >> >> >> If you think about it, parallelization of a Stream works best when the >> >> entire data set can be split amongst a set of worker threads, and that >> sort >> >> of implies that you want eager pre-fetch of data, so if your dataset >> does >> >> not fit in memory, that is likely to lead to less desirable outcomes. >> >> >> >> What I was able to do for Gatherers is to implement "gather(…) + >> >> collect(…)"-fusion so any number of consecutive gather(…)-operations >> >> immediately followed by a collect(…) is run in the same "island". >> >> >> >> So with that said, you could try something like the following: >> >> >> >> static <T> Collector<T, ?, Void> *forEach*(Consumer<? *super* T> >> *each*) { >> >> *return* Collector.of(() -> null, (*v*, *e*) -> each.accept(e), >> (*l*, >> >> *r*) -> l, (*v*) -> null, Collector.Characteristics.IDENTITY_FINISH); >> >> } >> >> >> >> >> >> stream >> >> .parallel() >> >> .unordered() >> >> .gather(Gatherers.windowFixed(BATCH_SIZE)) >> >> .collect(forEach(eachList -> println(eachList.getFirst()))); >> >> >> >> >> >> Cheers, >> >> √ >> >> >> >> >> >> *Viktor Klang* >> >> Software Architect, Java Platform Group >> >> Oracle >> >> ------------------------------ >> >> *From:* core-libs-dev <core-libs-dev-r...@openjdk.org> on behalf of >> David >> >> Alayachew <davidalayac...@gmail.com> >> >> *Sent:* Monday, 11 November 2024 14:52 >> >> *To:* core-libs-dev <core-libs-dev@openjdk.org> >> >> *Subject:* Re: Question about Streams, Gatherers, and fetching too many >> >> elements >> >> >> >> And just to avoid the obvious question, I can hold about 30 batches in >> >> memory before the Out of Memory error occurs. So this is not an issue >> of my >> >> batch size being too high. >> >> >> >> But just to confirm, I set the batch size to 1, and it still ran into >> an >> >> out of memory error. So I feel fairly confident saying that the >> Gatherer is >> >> trying to grab all available data before sending any of it downstream. >> >> >> >> On Mon, Nov 11, 2024, 8:46 AM David Alayachew < >> davidalayac...@gmail.com> >> >> wrote: >> >> >> >> Hello Core Libs Dev Team, >> >> >> >> I was trying out Gatherers for a project at work, and ran into a rather >> >> sad scenario. >> >> >> >> I need to process a large file in batches. Each batch is small enough >> that >> >> I can hold it in memory, but I cannot hold the entire file (and thus, >> all >> >> of the batches) in memory at once. >> >> >> >> Looking at the Gatherers API, I saw windowFixed and thought that it >> would >> >> be a great match for my use case. >> >> >> >> However, when trying it out, I was disappointed to see that it ran out >> of >> >> memory very quickly. Here is my attempt at using it. >> >> >> >> stream >> >> .parallel() >> >> .unordered() >> >> .gather(Gatherers.windowFixed(BATCH_SIZE)) >> >> .forEach(eachList -> println(eachList.getFirst())) >> >> ; >> >> >> >> As you can see, I am just splitting the file into batches, and printing >> >> out the first of each batch. This is purely for example's sake, of >> course. >> >> I had planned on building even more functionality on top of this, but I >> >> couldn't even get past this example. >> >> >> >> But anyways, not even a single one of them printed out. Which leads me >> to >> >> believe that it's pulling all of them in the Gatherer. >> >> >> >> I can get it to run successfully if I go sequentially, but not >> parallel. >> >> Parallel gives me that out of memory error. >> >> >> >> Is there any way for me to be able to have the Gatherer NOT pull in >> >> everything while still remaining parallel and unordered? >> >> >> >> Thank you for your time and help. >> >> David Alayachew >> >>