But *not letting any of them through into the forEach loop.

On Sat, Oct 19, 2024, 9:36 AM David Alayachew <davidalayac...@gmail.com>
wrote:

> So to be clear, I added a logger to my BufferedReader. So I know for a
> fact that it is reading data.
>
> And as for the code, it is a very simple parallel forEach.
>
> someStream.parallel().forEach(**work**);
>
> I only wanted to change the execution from sequential to parallel.
>
> So I have millions and millions of lines being read from the source (1
> million lines is several batches alone) and none of it is entering the
> forEach loop. And I know this because the very first thing my forEach loop
> does is print logs.
>
> As for the iterator, it is literally just a hand rolled iterator (that I
> copied from StackOverflow) where there is an instance field holding a list
> of elements, sized to match my batch size. hasNext() prepares the next
> batch and stores it into that instance field, then returns true/false if
> there is data to send, then next() returns that instance field's prepared
> batch. Preparing is just making a new list, and storing a batch size's
> worth of elements in it.
>
> So this means that it is just grabbing batches and batches, but letting
> any of them through into the forEach loop. And when I turn off parallelism,
> they suddenly are going through just fine, one after the other.
>
> On Sat, Oct 19, 2024, 7:12 AM Olexandr Rotan <rotanolexandr...@gmail.com>
> wrote:
>
>> Hi David. I am not a core libs team but I guess I can have some clues :).
>>
>> It is hard to tell without the code, but I assume that there are a few
>> layers to it.
>>
>> 1. Stalling. I would assume it is caused mostly by GC pauses taking too
>> long (forever) if GC does not have any computational powers to run on.
>> There is a fairly common GC-pauses related issue when database connection
>> interrupts with exception saying "Broken pipe", which under the hood is
>> caused by timeout of connection to database due to long GC pause when
>> running on low memory. I am not saying this is your case, but If I were to
>> guess I would assume that stall is caused by low memory.
>>
>> 2. Out of memory root cause may be too much splitting of your data source
>> input. You may try to limit it by modifying the behaviour of trySplit
>> method of your spliterator.
>>
>> Alternatively, If you don't mind taking up some disk space, you can try
>> to stream data into file, save it, and then use memory-mapped buffers
>> (java.nio.MappedByteBuffer) to process accepted data. I am not sure this
>> will work, but memory-mapped files is a common tool to deal with operations
>> that cant fit into RAM.
>>
>> Regards
>>
>>
>> On Sat, Oct 19, 2024 at 8:54 AM David Alayachew <davidalayac...@gmail.com>
>> wrote:
>>
>>> Hello Core Libs Dev Team,
>>>
>>> I have a file that I am streaming from a service, and I am trying to
>>> split into multiple parts based on a certain attribute found on each line.
>>> I am sending each part up to a different service.
>>>
>>> I am using BufferedReader.lines(). However, I cannot read the whole file
>>> into memory because it is larger than the amount of RAM that I have on the
>>> machine. So, since I don't have access to Java 22's Preview Gatherers Fixed
>>> Window, I used the iterator() method on my stream, wrapped that in another
>>> iterator that can grab my batch size worth of data, then built a
>>> spliterator from that that I then used to create a new stream. In short,
>>> this wrapper iterator isn't Iterator<T>, it's Iterator<List<T>>.
>>>
>>> When I ran this sequentially, everything worked well. However, my CPU
>>> was low and we definitely have a performance problem -- our team needs this
>>> number as fast as we can get. Plus, we had plenty of network bandwidth to
>>> spare, so I had (imo) good reason to go use parallelism.
>>>
>>> As soon as I turned on parallelism, the stream's behaviour changed
>>> completely. Instead of fetching the batch and processing, it started
>>> grabbing SEVERAL BATCHES and processing NONE OF THEM. Or at the very least,
>>> it grabbed so many batches that it ran out of memory before it could get to
>>> processing them.
>>>
>>> To give some numbers, this is a 4 core machine. And we can safely hold
>>> about 30-40 batches worth of data in memory before crashing. But again,
>>> when running sequentially, this thing only grabs 1 batch, processes that
>>> one batch, sends out the results, and then start the next one, all as
>>> expected. I thought that adding parallelism would simply make it so that we
>>> have this happening 4 or 8 times at once.
>>>
>>> After a very long period of digging, I managed to find this link.
>>>
>>>
>>> https://stackoverflow.com/questions/30825708/java-8-using-parallel-in-a-stream-causes-oom-error
>>>
>>> Tagir Valeev gives an answer which doesn't go very deep into the "why"
>>> at all. And the answer is more directed to the user's specific question as
>>> opposed to solving this particular problem.
>>>
>>> After digging through a bunch of other solutions (plus my own testing),
>>> it seems that the answer is that the engine that does parallelization for
>>> Streams tries to grab a large enough "buffer" before doing any parallel
>>> processing. I could be wrong, and how large that buffer is? I have no idea.
>>>
>>> Regardless, that's about where I gave up and went sequential, since the
>>> clock was ticking.
>>>
>>> But I still have a performance problem. How would one suggest going
>>> about this in Java 8?
>>>
>>> Thank you for your time and help.
>>> David Alayachew
>>>
>>>

Reply via email to