Hi Arnaud!

Which version of Flink are you using? In 0.10.1, the Netty library version
that we use has changed behavior, and allocates a lot of off-heap memory.
Would be my guess that this is the cause. In 1.0-SNAPSHOT, that should be
fixed, also on 0.10-SNAPSHOT.

If that turns out to be the cause, the good news is that we started
discussing a 0.10.2 maintenance release that should also have a fix for
that.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 11:12 AM, LINZ, Arnaud <al...@bouyguestelecom.fr>
wrote:

> Hi,
>
>
>
> Changing for a outer join did not change the error ; nor balancing the
> join with another dataset ; nor dividing parallelism level by 2 ; nor
> increasing memory by 2.
>
> Heap size & thread number is OK under JvisualVM.  So the problem is
> elsewhere.
>
>
>
> Do Flink uses off-heap memory ? How can I monitor it ?
>
>
>
> Thanks,
>
> Arnaud
>
>
>
> 10:58:53,384 INFO  org.apache.flink.yarn.YarnJobManager                       
>    - Status of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) 
> changed to FAILING.
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>           at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>           at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>           at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>           at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>           at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>           at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>           ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>           at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>           at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>           at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>           at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>           at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>           at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>
>
> (…)
>
> 10:58:54,423 WARN  akka.remote.ReliableDeliverySupervisor                     
>    - Association with remote system [akka.tcp://flink@172.21.125.13:40286] 
> has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 10:58:54,470 INFO  org.apache.flink.yarn.YarnJobManager                       
>    - Container container_e11_1453202008841_2794_01_000025 is completed with 
> diagnostics: Container 
> [pid=14331,containerID=container_e11_1453202008841_2794_01_000025] is running 
> beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory 
> used; 9.1 GB of 16.8 GB virtual memory used. Killing container.
>
> Dump of the process-tree for container_e11_1453202008841_2794_01_000025 :
>
>           |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>
>           |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c 
> /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m  
> -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log
>  -Dlogback.configurationFile=file:logback.xml 
> -Dlog4j.configuration=file:log4j.properties 
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> 
> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out
>  2> 
> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err
>  --streamingMode batch
>
>           |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 
> /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m 
> -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log
>  -Dlogback.configurationFile=file:logback.xml 
> -Dlog4j.configuration=file:log4j.properties 
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode 
> batch
>
>
>
> Container killed on request. Exit code is 143
>
> Container exited with a non-zero exit code 143
>
>
>
> 10:58:54,471 INFO  org.apache.flink.yarn.YarnJobManager
>
>
>
>
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* lundi 1 février 2016 09:40
> *À :* user@flink.apache.org
> *Objet :* RE: Left join with unbalanced dataset
>
>
>
> Hi,
>
> Thanks, I can’t believe I missed the outer join operators… Will try them
> and will keep you informed.
>
> I use the “official” 0.10 release from the maven repo. The off-heap memory
> I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I
> don’t have many open files at once, and doubling the amount of memory did
> not solve the problem.
>
> Arnaud
>
>
>
>
>
> *De :* ewenstep...@gmail.com [mailto:ewenstep...@gmail.com
> <ewenstep...@gmail.com>] *De la part de* Stephan Ewen
> *Envoyé :* dimanche 31 janvier 2016 20:57
> *À :* user@flink.apache.org
> *Objet :* Re: Left join with unbalanced dataset
>
>
>
> Hi!
>
>
>
> YARN killing the application seems strange. The memory use that YARN sees
> should not change even when one node gets a lot or data.
>
>
>
> Can you share what version of Flink (plus commit hash) you are using and
> whether you use off-heap memory or not?
>
>
>
> Thanks,
>
> Stephan
>
>
>
>
>
> On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
> Hi Arnaud,
>
>
>
> the unmatched elements of A will only end up on the same worker node if
> they all share the same key. Otherwise, they will be evenly spread out
> across your cluster. However, I would also recommend you to use Flink's
> leftOuterJoin.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <chiwanp...@apache.org>
> wrote:
>
> Hi Arnaud,
>
> To join two datasets, the community recommends using join operation rather
> than cogroup operation. For left join, you can use leftOuterJoin method.
> Flink’s optimizer decides distributed join execution strategy using some
> statistics of the datasets such as size of the dataset. Additionally, you
> can set join hint to help optimizer decide the strategy.
>
> In transformations section [1] of Flink documentation, you can find about
> outer join operation in detail.
>
> I hope this helps.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations
>
> Regards,
> Chiwan Park
>
> > On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <al...@bouyguestelecom.fr>
> wrote:
> >
> > Hello,
> >
> > I have a very big dataset A to left join with a dataset B that is half
> its size. That is to say, half of A records will be matched with one record
> of B, and the other half with null values.
> >
> > I used a CoGroup for that, but my batch fails because yarn kills the
> container due to memory problems.
> >
> > I guess that’s because one worker will get half of A dataset (the
> unmatched ones), and that’s too much for a single JVM
> >
> > Am I right in my diagnostic ? Is there a better way to left join
> unbalanced datasets ?
> >
> > Best regards,
> >
> > Arnaud
> >
> >
> >
>
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>
>
>
>

Reply via email to