Thanks, Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads to the success of the batch.
I've figured out which dataset is consuming the most memory, I have a big join that demultiplies the size of the input set before a group reduce. I am willing to optimize my code by reducing the join output size upon junction. The outline of the treatment is : DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge. DataSet B = (K1, V2) where there are multiple values V2 for the same K1 (say 5) I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce() As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A. Flink does not start the reduce operation until all lines have been created (memory bottleneck is during the collection of all lines) ; but theorically it is possible. I see no "join group" operator that could do something like "A.groupBy(K1,K2).join(B).on(K1).reduce()" Is there a way to do this ? The other way I see is to load B in memory for all nodes and use a hash map upon reduction to get all A.join(B) lines. B is not that small, but I think it will still save RAM. Best regards, Arnaud -----Message d'origine----- De : Ufuk Celebi [mailto:u...@apache.org] Envoyé : mardi 2 février 2016 15:27 À : user@flink.apache.org Objet : Re: Left join with unbalanced dataset > On 02 Feb 2016, at 15:15, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote: > > Hi, > > Running again with more RAM made the treatement go further, but Yarn still > killed one container for memory consumption. I will experiment various memory > parameters. OK, the killing of the container probably triggered the RemoteTransportException. Can you tell me how many containers you are using, how much phyiscal memory the machines have and how much the containers get? You can monitor memory usage by setting taskmanager.debug.memory.startLogThread: true in the config. This will periodically log the memory consumption to the task manager logs. Can you try this and check the logs for the memory consumption? You can also have a look at it in the web frontend under the Task Manager tab. – Ufuk ________________________________ L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.