Hello Arnaud, > Flink does not start the reduce operation until all lines have > been created (memory bottleneck is during the collection > of all lines) ; but theorically it is possible.
The problem that `S.groupBy(...).reduce(...)` needs to fully materialize S comes from the fact that the implementation of reduce is currently sort based. But this PR will partially solve this: https://github.com/apache/flink/pull/1517 It implements a hash-based combiner, which will not materialize the input, but instead needs memory proportional to only the number of different keys occurring. You might want to try rebasing to this PR, to see whether it improves your situation. (I also plan to extend this implementation to the actual reduce after the combine, but I'm not sure when will I get around to that.) Best, Gábor 2016-02-02 16:56 GMT+01:00 LINZ, Arnaud <al...@bouyguestelecom.fr>: > Thanks, > > Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads > to the success of the batch. > > I've figured out which dataset is consuming the most memory, I have a big > join that demultiplies the size of the input set before a group reduce. > I am willing to optimize my code by reducing the join output size upon > junction. > > The outline of the treatment is : > DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge. > DataSet B = (K1, V2) where there are multiple values V2 for the same K1 (say > 5) > > I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce() > As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A. > > Flink does not start the reduce operation until all lines have been created > (memory bottleneck is during the collection of all lines) ; but theorically > it is possible. > I see no "join group" operator that could do something like > "A.groupBy(K1,K2).join(B).on(K1).reduce()" > > Is there a way to do this ? > > The other way I see is to load B in memory for all nodes and use a hash map > upon reduction to get all A.join(B) lines. B is not that small, but I think > it will still save RAM. > > Best regards, > Arnaud > > -----Message d'origine----- > De : Ufuk Celebi [mailto:u...@apache.org] > Envoyé : mardi 2 février 2016 15:27 > À : user@flink.apache.org > Objet : Re: Left join with unbalanced dataset > > >> On 02 Feb 2016, at 15:15, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote: >> >> Hi, >> >> Running again with more RAM made the treatement go further, but Yarn still >> killed one container for memory consumption. I will experiment various >> memory parameters. > > OK, the killing of the container probably triggered the > RemoteTransportException. > > Can you tell me how many containers you are using, how much phyiscal memory > the machines have and how much the containers get? > > You can monitor memory usage by setting > > taskmanager.debug.memory.startLogThread: true > > in the config. This will periodically log the memory consumption to the task > manager logs. Can you try this and check the logs for the memory consumption? > > You can also have a look at it in the web frontend under the Task Manager tab. > > – Ufuk > > > ________________________________ > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous > n'êtes pas destinataire de ce message, merci de le détruire et d'avertir > l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is prohibited. > If you are not the intended recipient of this message, then please delete it > and notify the sender.