To make sure this discussion does not go in a wrong direction: There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.
The issue here is different - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections. - Another reason could be leaky behavior in Hadoop's HDFS code. @Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk. Best, Stephan On Tue, Feb 2, 2016 at 6:21 PM, Gábor Gévay <gga...@gmail.com> wrote: > Hello Arnaud, > > > Flink does not start the reduce operation until all lines have > > been created (memory bottleneck is during the collection > > of all lines) ; but theorically it is possible. > > The problem that `S.groupBy(...).reduce(...)` needs to fully > materialize S comes from the fact that the implementation of reduce is > currently sort based. But this PR will partially solve this: > https://github.com/apache/flink/pull/1517 > It implements a hash-based combiner, which will not materialize the > input, but instead needs memory proportional to only the number of > different keys occurring. You might want to try rebasing to this PR, > to see whether it improves your situation. > > (I also plan to extend this implementation to the actual reduce after > the combine, but I'm not sure when will I get around to that.) > > Best, > Gábor > > > > 2016-02-02 16:56 GMT+01:00 LINZ, Arnaud <al...@bouyguestelecom.fr>: > > Thanks, > > > > Giving the batch an outrageous amount of memory with a 0.5 heap ratio > leads to the success of the batch. > > > > I've figured out which dataset is consuming the most memory, I have a > big join that demultiplies the size of the input set before a group reduce. > > I am willing to optimize my code by reducing the join output size upon > junction. > > > > The outline of the treatment is : > > DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge. > > DataSet B = (K1, V2) where there are multiple values V2 for the same K1 > (say 5) > > > > I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce() > > As B contains 5 lines for one key of A, A.join(B) is 5 times the size of > A. > > > > Flink does not start the reduce operation until all lines have been > created (memory bottleneck is during the collection of all lines) ; but > theorically it is possible. > > I see no "join group" operator that could do something like > "A.groupBy(K1,K2).join(B).on(K1).reduce()" > > > > Is there a way to do this ? > > > > The other way I see is to load B in memory for all nodes and use a hash > map upon reduction to get all A.join(B) lines. B is not that small, but I > think it will still save RAM. > > > > Best regards, > > Arnaud > > > > -----Message d'origine----- > > De : Ufuk Celebi [mailto:u...@apache.org] > > Envoyé : mardi 2 février 2016 15:27 > > À : user@flink.apache.org > > Objet : Re: Left join with unbalanced dataset > > > > > >> On 02 Feb 2016, at 15:15, LINZ, Arnaud <al...@bouyguestelecom.fr> > wrote: > >> > >> Hi, > >> > >> Running again with more RAM made the treatement go further, but Yarn > still killed one container for memory consumption. I will experiment > various memory parameters. > > > > OK, the killing of the container probably triggered the > RemoteTransportException. > > > > Can you tell me how many containers you are using, how much phyiscal > memory the machines have and how much the containers get? > > > > You can monitor memory usage by setting > > > > taskmanager.debug.memory.startLogThread: true > > > > in the config. This will periodically log the memory consumption to the > task manager logs. Can you try this and check the logs for the memory > consumption? > > > > You can also have a look at it in the web frontend under the Task > Manager tab. > > > > – Ufuk > > > > > > ________________________________ > > > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si > vous n'êtes pas destinataire de ce message, merci de le détruire et > d'avertir l'expéditeur. > > > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then > please delete it and notify the sender. >