But *not letting any of them through into the forEach loop. On Sat, Oct 19, 2024, 9:36 AM David Alayachew <davidalayac...@gmail.com> wrote:
> So to be clear, I added a logger to my BufferedReader. So I know for a > fact that it is reading data. > > And as for the code, it is a very simple parallel forEach. > > someStream.parallel().forEach(**work**); > > I only wanted to change the execution from sequential to parallel. > > So I have millions and millions of lines being read from the source (1 > million lines is several batches alone) and none of it is entering the > forEach loop. And I know this because the very first thing my forEach loop > does is print logs. > > As for the iterator, it is literally just a hand rolled iterator (that I > copied from StackOverflow) where there is an instance field holding a list > of elements, sized to match my batch size. hasNext() prepares the next > batch and stores it into that instance field, then returns true/false if > there is data to send, then next() returns that instance field's prepared > batch. Preparing is just making a new list, and storing a batch size's > worth of elements in it. > > So this means that it is just grabbing batches and batches, but letting > any of them through into the forEach loop. And when I turn off parallelism, > they suddenly are going through just fine, one after the other. > > On Sat, Oct 19, 2024, 7:12 AM Olexandr Rotan <rotanolexandr...@gmail.com> > wrote: > >> Hi David. I am not a core libs team but I guess I can have some clues :). >> >> It is hard to tell without the code, but I assume that there are a few >> layers to it. >> >> 1. Stalling. I would assume it is caused mostly by GC pauses taking too >> long (forever) if GC does not have any computational powers to run on. >> There is a fairly common GC-pauses related issue when database connection >> interrupts with exception saying "Broken pipe", which under the hood is >> caused by timeout of connection to database due to long GC pause when >> running on low memory. I am not saying this is your case, but If I were to >> guess I would assume that stall is caused by low memory. >> >> 2. Out of memory root cause may be too much splitting of your data source >> input. You may try to limit it by modifying the behaviour of trySplit >> method of your spliterator. >> >> Alternatively, If you don't mind taking up some disk space, you can try >> to stream data into file, save it, and then use memory-mapped buffers >> (java.nio.MappedByteBuffer) to process accepted data. I am not sure this >> will work, but memory-mapped files is a common tool to deal with operations >> that cant fit into RAM. >> >> Regards >> >> >> On Sat, Oct 19, 2024 at 8:54 AM David Alayachew <davidalayac...@gmail.com> >> wrote: >> >>> Hello Core Libs Dev Team, >>> >>> I have a file that I am streaming from a service, and I am trying to >>> split into multiple parts based on a certain attribute found on each line. >>> I am sending each part up to a different service. >>> >>> I am using BufferedReader.lines(). However, I cannot read the whole file >>> into memory because it is larger than the amount of RAM that I have on the >>> machine. So, since I don't have access to Java 22's Preview Gatherers Fixed >>> Window, I used the iterator() method on my stream, wrapped that in another >>> iterator that can grab my batch size worth of data, then built a >>> spliterator from that that I then used to create a new stream. In short, >>> this wrapper iterator isn't Iterator<T>, it's Iterator<List<T>>. >>> >>> When I ran this sequentially, everything worked well. However, my CPU >>> was low and we definitely have a performance problem -- our team needs this >>> number as fast as we can get. Plus, we had plenty of network bandwidth to >>> spare, so I had (imo) good reason to go use parallelism. >>> >>> As soon as I turned on parallelism, the stream's behaviour changed >>> completely. Instead of fetching the batch and processing, it started >>> grabbing SEVERAL BATCHES and processing NONE OF THEM. Or at the very least, >>> it grabbed so many batches that it ran out of memory before it could get to >>> processing them. >>> >>> To give some numbers, this is a 4 core machine. And we can safely hold >>> about 30-40 batches worth of data in memory before crashing. But again, >>> when running sequentially, this thing only grabs 1 batch, processes that >>> one batch, sends out the results, and then start the next one, all as >>> expected. I thought that adding parallelism would simply make it so that we >>> have this happening 4 or 8 times at once. >>> >>> After a very long period of digging, I managed to find this link. >>> >>> >>> https://stackoverflow.com/questions/30825708/java-8-using-parallel-in-a-stream-causes-oom-error >>> >>> Tagir Valeev gives an answer which doesn't go very deep into the "why" >>> at all. And the answer is more directed to the user's specific question as >>> opposed to solving this particular problem. >>> >>> After digging through a bunch of other solutions (plus my own testing), >>> it seems that the answer is that the engine that does parallelization for >>> Streams tries to grab a large enough "buffer" before doing any parallel >>> processing. I could be wrong, and how large that buffer is? I have no idea. >>> >>> Regardless, that's about where I gave up and went sequential, since the >>> clock was ticking. >>> >>> But I still have a performance problem. How would one suggest going >>> about this in Java 8? >>> >>> Thank you for your time and help. >>> David Alayachew >>> >>>