Hi Robert,

This is a regression on the current master due to changes in the way
Flink calculates the memory and sets the maximum direct memory size.
We introduced these changes when we merged support for off-heap
memory. This is not a problem in the way Flink deals with managed
memory, just -XX:MaxDirectMemorySize is set too low. By default the
maximum direct memory is only used by the network stack. The network
library we use, allocates more direct memory than we expected.

We'll push a fix to the master as soon as possible. Thank you for
reporting and thanks for your patience.

Best regards,
Max

On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke
<ro.schmid...@gmail.com> wrote:
> Hi everyone,
>
> I'm constantly running into OutOfMemoryErrors and for the life of me I
> cannot figure out what's wrong. Let me describe my setup. I'm running the
> current master branch of Flink on YARN (Hadoop 2.7.0). My job is an
> unfinished implementation of TPC-H Q2
> (https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java),
> I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per
> machine. This is what I believe to be the relevant section of my
> yarn_site.xml:
>
>
> <property>
>     <name>yarn.nodemanager.resource.memory-mb</name>
>     <value>57344</value>
>   </property>
> <!--
>   <property>
>     <name>yarn.scheduler.minimum-allocation-mb</name>
>     <value>8192</value>
>   </property>
> -->
>   <property>
>     <name>yarn.scheduler.maximum-allocation-mb</name>
>     <value>55296</value>
>   </property>
>
>   <property>
>     <name>yarn.nodemanager.vmem-check-enabled</name>
>     <value>false</value>
>   </property>
>
>
> And this is how I submit the job:
>
>
> $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 .....
>
>
> The TMs happily report:
>
> .....
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -  JVM Options:
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -Xms24511m
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -Xmx24511m
> 11:50:15,577 INFO  org.apache.flink.yarn.appMaster.YarnTaskManagerRunner
> -     -XX:MaxDirectMemorySize=65m
> .....
>
>
> I've tried various combinations of YARN and Flink options, to no avail. I
> always end up with the following stacktrace:
>
>
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
> at
> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.DecoderException:
> java.lang.OutOfMemoryError: Direct buffer memory
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> ... 9 more
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108)
> at
> io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157)
> at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
> at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
> at
> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
> ... 10 more
>
>
> I always figured that running into OOMEs with Flink would be quite hard to
> achieve, however I'm wondering what's going wrong now. Seems to be related
> to the Direct Memory? Why are you limiting it in the JVM options at all? Is
> there a special place where I can safely increase the size / remove the
> option altogether for unboundedness?
>
> A note on the data sizes, I used a scaling factor 1000 for the dbgen command
> of TPC-H, which effectively means the following. Each table is split in 7
> chunks (one local to each TM), each chunk of the part.tbl is 734M, each
> chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These are
> not excessive amounts of data, however the query (at least my
> implementation) involves joins (the one in line 249 causing the OOME) and
> maybe there are some network issues?
>
> Maybe you can point me into the right direction, thanks a bunch. Cheers.
>
> Robert

Reply via email to