I pulled the current master branch and rebuilt Flink completely anyway. Works like a charm.
On Thu, Oct 1, 2015 at 11:11 AM, Maximilian Michels <m...@apache.org> wrote: > By the way, you might have to use the "-U" flag to force Maven to > update its dependencies: mvn -U clean install -DskipTests > > On Thu, Oct 1, 2015 at 10:19 AM, Robert Schmidtke > <ro.schmid...@gmail.com> wrote: > > Sweet! I'll pull it straight away. Thanks! > > > > On Thu, Oct 1, 2015 at 10:18 AM, Maximilian Michels <m...@apache.org> > wrote: > >> > >> Hi Robert, > >> > >> Just a quick update: The issue has been resolved in the latest Maven > >> 0.10-SNAPSHOT dependency. > >> > >> Cheers, > >> Max > >> > >> On Wed, Sep 30, 2015 at 3:19 PM, Robert Schmidtke > >> <ro.schmid...@gmail.com> wrote: > >> > Hi Max, > >> > > >> > thanks for your quick reply. I found the relevant code and commented > it > >> > out > >> > for testing, seems to be working. Happily waiting for the fix. Thanks > >> > again. > >> > > >> > Robert > >> > > >> > On Wed, Sep 30, 2015 at 1:42 PM, Maximilian Michels <m...@apache.org> > >> > wrote: > >> >> > >> >> Hi Robert, > >> >> > >> >> This is a regression on the current master due to changes in the way > >> >> Flink calculates the memory and sets the maximum direct memory size. > >> >> We introduced these changes when we merged support for off-heap > >> >> memory. This is not a problem in the way Flink deals with managed > >> >> memory, just -XX:MaxDirectMemorySize is set too low. By default the > >> >> maximum direct memory is only used by the network stack. The network > >> >> library we use, allocates more direct memory than we expected. > >> >> > >> >> We'll push a fix to the master as soon as possible. Thank you for > >> >> reporting and thanks for your patience. > >> >> > >> >> Best regards, > >> >> Max > >> >> > >> >> On Wed, Sep 30, 2015 at 1:31 PM, Robert Schmidtke > >> >> <ro.schmid...@gmail.com> wrote: > >> >> > Hi everyone, > >> >> > > >> >> > I'm constantly running into OutOfMemoryErrors and for the life of > me > >> >> > I > >> >> > cannot figure out what's wrong. Let me describe my setup. I'm > running > >> >> > the > >> >> > current master branch of Flink on YARN (Hadoop 2.7.0). My job is an > >> >> > unfinished implementation of TPC-H Q2 > >> >> > > >> >> > > >> >> > ( > https://github.com/robert-schmidtke/flink-benchmarks/blob/master/xtreemfs-flink-benchmark/src/main/java/org/xtreemfs/flink/benchmark/TPCH2Benchmark.java > ), > >> >> > I run on 8 machines (1 for JM, the other 7 for TMs) with 64G of > >> >> > memory > >> >> > per > >> >> > machine. This is what I believe to be the relevant section of my > >> >> > yarn_site.xml: > >> >> > > >> >> > > >> >> > <property> > >> >> > <name>yarn.nodemanager.resource.memory-mb</name> > >> >> > <value>57344</value> > >> >> > </property> > >> >> > <!-- > >> >> > <property> > >> >> > <name>yarn.scheduler.minimum-allocation-mb</name> > >> >> > <value>8192</value> > >> >> > </property> > >> >> > --> > >> >> > <property> > >> >> > <name>yarn.scheduler.maximum-allocation-mb</name> > >> >> > <value>55296</value> > >> >> > </property> > >> >> > > >> >> > <property> > >> >> > <name>yarn.nodemanager.vmem-check-enabled</name> > >> >> > <value>false</value> > >> >> > </property> > >> >> > > >> >> > > >> >> > And this is how I submit the job: > >> >> > > >> >> > > >> >> > $FLINK_HOME/bin/flink run -m yarn-cluster -yjm 16384 -ytm 32768 > -yn 7 > >> >> > ..... > >> >> > > >> >> > > >> >> > The TMs happily report: > >> >> > > >> >> > ..... > >> >> > 11:50:15,577 INFO > >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner > >> >> > - JVM Options: > >> >> > 11:50:15,577 INFO > >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner > >> >> > - -Xms24511m > >> >> > 11:50:15,577 INFO > >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner > >> >> > - -Xmx24511m > >> >> > 11:50:15,577 INFO > >> >> > org.apache.flink.yarn.appMaster.YarnTaskManagerRunner > >> >> > - -XX:MaxDirectMemorySize=65m > >> >> > ..... > >> >> > > >> >> > > >> >> > I've tried various combinations of YARN and Flink options, to no > >> >> > avail. > >> >> > I > >> >> > always end up with the following stacktrace: > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > >> >> > java.lang.OutOfMemoryError: Direct buffer memory > >> >> > at > >> >> > > >> >> > > >> >> > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > >> >> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) > >> >> > at java.lang.Thread.run(Thread.java:745) > >> >> > Caused by: io.netty.handler.codec.DecoderException: > >> >> > java.lang.OutOfMemoryError: Direct buffer memory > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) > >> >> > ... 9 more > >> >> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory > >> >> > at java.nio.Bits.reserveMemory(Bits.java:658) > >> >> > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > >> >> > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:157) > >> >> > at > >> >> > > >> >> > > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) > >> >> > at > >> >> > > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) > >> >> > at > >> >> > > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) > >> >> > at > >> >> > > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) > >> >> > at > >> >> > > >> >> > > >> >> > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228) > >> >> > ... 10 more > >> >> > > >> >> > > >> >> > I always figured that running into OOMEs with Flink would be quite > >> >> > hard > >> >> > to > >> >> > achieve, however I'm wondering what's going wrong now. Seems to be > >> >> > related > >> >> > to the Direct Memory? Why are you limiting it in the JVM options at > >> >> > all? > >> >> > Is > >> >> > there a special place where I can safely increase the size / remove > >> >> > the > >> >> > option altogether for unboundedness? > >> >> > > >> >> > A note on the data sizes, I used a scaling factor 1000 for the > dbgen > >> >> > command > >> >> > of TPC-H, which effectively means the following. Each table is > split > >> >> > in > >> >> > 7 > >> >> > chunks (one local to each TM), each chunk of the part.tbl is 734M, > >> >> > each > >> >> > chunk of supplier.tbl is 43M, each chunk of partsupp.tbl is 3.6G. > >> >> > These > >> >> > are > >> >> > not excessive amounts of data, however the query (at least my > >> >> > implementation) involves joins (the one in line 249 causing the > OOME) > >> >> > and > >> >> > maybe there are some network issues? > >> >> > > >> >> > Maybe you can point me into the right direction, thanks a bunch. > >> >> > Cheers. > >> >> > > >> >> > Robert > >> > > >> > > >> > > >> > > >> > -- > >> > My GPG Key ID: 336E2680 > > > > > > > > > > -- > > My GPG Key ID: 336E2680 > -- My GPG Key ID: 336E2680