Hi Arnaud! Which version of Flink are you using? In 0.10.1, the Netty library version that we use has changed behavior, and allocates a lot of off-heap memory. Would be my guess that this is the cause. In 1.0-SNAPSHOT, that should be fixed, also on 0.10-SNAPSHOT.
If that turns out to be the cause, the good news is that we started discussing a 0.10.2 maintenance release that should also have a fix for that. Greetings, Stephan On Tue, Feb 2, 2016 at 11:12 AM, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote: > Hi, > > > > Changing for a outer join did not change the error ; nor balancing the > join with another dataset ; nor dividing parallelism level by 2 ; nor > increasing memory by 2. > > Heap size & thread number is OK under JvisualVM. So the problem is > elsewhere. > > > > Do Flink uses off-heap memory ? How can I monitor it ? > > > > Thanks, > > Arnaud > > > > 10:58:53,384 INFO org.apache.flink.yarn.YarnJobManager > - Status of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) > changed to FAILING. > > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at > writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error > obtaining the sorted input: Thread 'SortMerger spilling thread' terminated > due to an exception: java.io.IOException: I/O channel already closed. Could > not fulfill: > org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5 > > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > > at java.lang.Thread.run(Thread.java:744) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger spilling thread' terminated due to an exception: > java.io.IOException: I/O channel already closed. Could not fulfill: > org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5 > > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089) > > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger spilling thread' > terminated due to an exception: java.io.IOException: I/O channel already > closed. Could not fulfill: > org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5 > > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) > > Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: > I/O channel already closed. Could not fulfill: > org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5 > > at com.esotericsoftware.kryo.io.Output.flush(Output.java:165) > > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194) > > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247) > > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) > > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) > > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73) > > at > org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) > > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) > > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > > Caused by: java.io.IOException: I/O channel already closed. Could not > fulfill: > org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5 > > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249) > > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54) > > at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29) > > at > org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217) > > at > org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203) > > at > org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) > > at > org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201) > > at > org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) > > at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) > > > > (…) > > 10:58:54,423 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system [akka.tcp://flink@172.21.125.13:40286] > has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. > > 10:58:54,470 INFO org.apache.flink.yarn.YarnJobManager > - Container container_e11_1453202008841_2794_01_000025 is completed with > diagnostics: Container > [pid=14331,containerID=container_e11_1453202008841_2794_01_000025] is running > beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory > used; 9.1 GB of 16.8 GB virtual memory used. Killing container. > > Dump of the process-tree for container_e11_1453202008841_2794_01_000025 : > > |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) > SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE > > |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c > /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m > -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log > -Dlogback.configurationFile=file:logback.xml > -Dlog4j.configuration=file:log4j.properties > org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> > /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out > 2> > /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err > --streamingMode batch > > |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 > /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m > -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log > -Dlogback.configurationFile=file:logback.xml > -Dlog4j.configuration=file:log4j.properties > org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode > batch > > > > Container killed on request. Exit code is 143 > > Container exited with a non-zero exit code 143 > > > > 10:58:54,471 INFO org.apache.flink.yarn.YarnJobManager > > > > > > > > *De :* LINZ, Arnaud > *Envoyé :* lundi 1 février 2016 09:40 > *À :* user@flink.apache.org > *Objet :* RE: Left join with unbalanced dataset > > > > Hi, > > Thanks, I can’t believe I missed the outer join operators… Will try them > and will keep you informed. > > I use the “official” 0.10 release from the maven repo. The off-heap memory > I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I > don’t have many open files at once, and doubling the amount of memory did > not solve the problem. > > Arnaud > > > > > > *De :* ewenstep...@gmail.com [mailto:ewenstep...@gmail.com > <ewenstep...@gmail.com>] *De la part de* Stephan Ewen > *Envoyé :* dimanche 31 janvier 2016 20:57 > *À :* user@flink.apache.org > *Objet :* Re: Left join with unbalanced dataset > > > > Hi! > > > > YARN killing the application seems strange. The memory use that YARN sees > should not change even when one node gets a lot or data. > > > > Can you share what version of Flink (plus commit hash) you are using and > whether you use off-heap memory or not? > > > > Thanks, > > Stephan > > > > > > On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > > Hi Arnaud, > > > > the unmatched elements of A will only end up on the same worker node if > they all share the same key. Otherwise, they will be evenly spread out > across your cluster. However, I would also recommend you to use Flink's > leftOuterJoin. > > > > Cheers, > > Till > > > > On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <chiwanp...@apache.org> > wrote: > > Hi Arnaud, > > To join two datasets, the community recommends using join operation rather > than cogroup operation. For left join, you can use leftOuterJoin method. > Flink’s optimizer decides distributed join execution strategy using some > statistics of the datasets such as size of the dataset. Additionally, you > can set join hint to help optimizer decide the strategy. > > In transformations section [1] of Flink documentation, you can find about > outer join operation in detail. > > I hope this helps. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations > > Regards, > Chiwan Park > > > On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <al...@bouyguestelecom.fr> > wrote: > > > > Hello, > > > > I have a very big dataset A to left join with a dataset B that is half > its size. That is to say, half of A records will be matched with one record > of B, and the other half with null values. > > > > I used a CoGroup for that, but my batch fails because yarn kills the > container due to memory problems. > > > > I guess that’s because one worker will get half of A dataset (the > unmatched ones), and that’s too much for a single JVM > > > > Am I right in my diagnostic ? Is there a better way to left join > unbalanced datasets ? > > > > Best regards, > > > > Arnaud > > > > > > > > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si > vous n'êtes pas destinataire de ce message, merci de le détruire et > d'avertir l'expéditeur. > > > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then > please delete it and notify the sender. > > > > >