Hi everyone, I'm constantly running into OutOfMemoryErrors and for the life of me I cannot figure out what's wrong. Let me describe my setup. I'm running the current master branch of Flink on YARN (Hadoop 2.7.0). My job is an unfinished implementation of TPC-H Q2 ( https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java), I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of memory per machine. This is what I believe to be the relevant section of my yarn_site.xml:
<property> <name>yarn.nodemanager.resource.memory-mb</name> <value>57344</value> </property> <!-- <property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>8192</value> </property> --> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>55296</value> </property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> And this is how I submit the job: $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 -yn 7 ..... The TMs happily report: ..... 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner - JVM Options: 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner - -Xms24511m 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner - -Xmx24511m 11:50:15,577 INFO org.apache.flink.yarn.appMaster.YarnTaskManagerRunner - -XX:MaxDirectMemorySize=65m ..... I've tried various combinations of YARN and Flink options, to no avail. I always end up with the following stacktrace: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) at java.lang.Thread.run(Thread.java:745) Caused by: io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) ... 9 more Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:658) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) at io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108) at io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157) at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228) ... 10 more I always figured that running into OOMEs with Flink would be quite hard to achieve, however I'm wondering what's going wrong now. Seems to be related to the Direct Memory? Why are you limiting it in the JVM options at all? Is there a special place where I can safely increase the size / remove the option altogether for unboundedness? A note on the data sizes, I used a scaling factor 1000 for the dbgen command of TPC-H, which effectively means the following. Each table is split in 7 chunks (one local to each TM), each chunk of the part.tbl is 734M, each chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. These are not excessive amounts of data, however the query (at least my implementation) involves joins (the one in line 249 causing the OOME) and maybe there are some network issues? Maybe you can point me into the right direction, thanks a bunch. Cheers. Robert