Hi, We are using standalone flink at the moment. Our snapshot build comes from src. I removed everything from old build before I started the server. There shouldn't be any traces from the old stuff left.
I bumped the process to 20GB and has been running stabile and also running G1. There might still be some kind of memory leak that I don't see atm due to the large heap. This is how the taskmanager process looks right now. java -XX:+UseG1GC -Xms20480M -Xmx20480M -XX:MaxDirectMemorySize=20480M -Dlog.file=/appsdir/flink-0.10-SNAPSHOT/log/flink-splat-taskmanager-0-fbweb608.log -Dlog4j.configuration=file:/appsdir/flink-0.10-SNAPSHOT/conf/log4j.properties -Dlogback.configurationFile=file:/appsdir/flink-0.10-SNAPSHOT/conf/logback.xml -classpath /appsdir/flink-0.10-SNAPSHOT/lib/flink-dist-0.10-SNAPSHOT.jar:/appsdir/flink-0.10-SNAPSHOT/lib/flink-python-0.10-SNAPSHOT.jar:/appsdir/flink-0.10-SNAPSHOT/lib/log4j-1.2.17.jar:/appsdir/flink-0.10-SNAPSHOT/lib/slf4j-log4j12-1.7.7.jar::: org.apache.flink.runtime.taskmanager.TaskManager --configDir /appsdir/flink-0.10-SNAPSHOT/conf --streamingMode streaming /Jakob On Tue, Oct 20, 2015 at 3:38 PM, Stephan Ewen <se...@apache.org> wrote: > @Jakob: If you use Flink standalone (not through YARN), one thing to be > aware of is that the relevant change is in the bash scripts that start the > cluster, not the code. If you upgraded Flink by copying a newer JAR file, > you missed the update of the bash scripts and missed the fix for that issue. > > On Tue, Oct 20, 2015 at 10:39 AM, Maximilian Michels <m...@apache.org> > wrote: > >> Hi Jakob, >> >> Your revision number is fairly new and your direct memory >> configuration seems to be correct for your setup. If you have the >> time, you could verify that the memory flags for the JVM are set >> correctly by the startup script. You can see that in the first lines >> of the task manager log. If the direct memory was set to 2GB with the >> default number of network buffers, the JVM should have had enough >> direct memory. Still, we'd like to find out what caused your problem. >> >> Are you running on YARN or standalone? >> >> Yes, the usual setup is one task manager per host/VM. The task manager >> will allocate all memory upfront. However, a large part of this memory >> will be self-managed by Flink and not touched much by the GC. By >> default, this is 0.7 of the configured heap memory. You can control >> this ratio with the taskmanager.memory.fraction variable. You can also >> set a fixed managed memory size using taskmanager.memory.size (MB). In >> large memory setups, we have seen a slightly better performance using >> off-heap memory allocation. This can be configured using >> taskmanager.memory.off-heap: true. >> >> Please let us know if you experience any further issues. >> >> Best, >> Max >> >> On Mon, Oct 19, 2015 at 10:14 PM, Jakob Ericsson >> <jakob.erics...@gmail.com> wrote: >> > The revision is "Starting JobManager (Version: 0.10-SNAPSHOT, >> Rev:c82ebbf, >> > Date:15.10.2015 @ 11:34:01 CEST)" >> > >> > We have a lot of memory left on the machine. I have increased it quite a >> > lot. >> > >> > What is your thought on memory configuration? >> > If I understand Flink correctly, you should only have one taskmanager >> > running each host? >> > >> > For a pretty standard machine with 16 cores and 32-64 GB memory. This >> means >> > that you will have one java process running with a Xmx30G or even >> higher for >> > exhausting all memory of the machine. This is, at least for the CMS GC, >> not >> > the most optimal configuration. >> > It might be viable for G1 but we got some really serious java core dumps >> > when running G1. >> > >> > I looked a bit on the flags that was set on the process and it seems >> that >> > Xmx and MaxDirectMemorySize are set to the same value by the shell >> script. >> > When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was >> > running with a taskmanager.heap.mb:2048. So the direct memory buffer >> was set >> > to 2GB. >> > >> > I have restarted the process with G1 again and 20GB as >> taskmanager.heap.mb. >> > Lets see if it will be stable during the night. >> > >> > >> > On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <m...@apache.org> >> wrote: >> >> >> >> You can see the revision number and the build date in the JobManager >> >> log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT, >> >> Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)" >> >> >> >> On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <m...@apache.org> >> >> wrote: >> >> > When was the last time you updated your 0.10-SNAPSHOT Flink cluster? >> >> > If it has been more than a couple of weeks, then I'd advise you to >> >> > update to the latest snapshot version. There has been an issue with >> >> > the calculation of the off-heap memory limit in the past. >> >> > >> >> > Thanks, >> >> > Max >> >> > >> >> > On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <gyula.f...@gmail.com> >> >> > wrote: >> >> >> It's 0.10-SNAPSHOT >> >> >> >> >> >> Gyula >> >> >> >> >> >> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2015. okt. >> 19., >> >> >> H, >> >> >> 17:13): >> >> >>> >> >> >>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or >> >> >>> 0.10-SNAPSHOT? >> >> >>> >> >> >>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels < >> m...@apache.org> >> >> >>> wrote: >> >> >>> > Hi Jakob, >> >> >>> > >> >> >>> > Thanks. Flink allocates its network memory as direct memory >> outside >> >> >>> > the normal Java heap. By default, that is 64MB but can grow up to >> >> >>> > 128MB on heavy network transfer. How much memory does your >> machine >> >> >>> > have? Could it be that your upper memory bound is lower than >> 2048 + >> >> >>> > 128 MB? >> >> >>> > >> >> >>> > Best, >> >> >>> > Max >> >> >>> > >> >> >>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson >> >> >>> > <jakob.erics...@gmail.com> wrote: >> >> >>> >> Hi, >> >> >>> >> >> >> >>> >> See answers below. >> >> >>> >> >> >> >>> >> /Jakob >> >> >>> >> >> >> >>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels >> >> >>> >> <m...@apache.org> >> >> >>> >> wrote: >> >> >>> >>> >> >> >>> >>> Hi Jakob, >> >> >>> >>> >> >> >>> >>> Thank you for reporting the bug. Could you please post your >> >> >>> >>> configuration here? In particular, could you please tell us the >> >> >>> >>> value >> >> >>> >>> of the following configuration variables: >> >> >>> >>> >> >> >>> >>> taskmanager.heap.mb >> >> >>> >> >> >> >>> >> taskmanager.heap.mb: 2048 >> >> >>> >>> >> >> >>> >>> taskmanager.network.numberOfBuffers >> >> >>> >> >> >> >>> >> >> >> >>> >> Default value. Not changed. >> >> >>> >> >> >> >>> >>> >> >> >>> >>> taskmanager.memory.off-heap >> >> >>> >>> >> >> >>> >> Default value Not changed. >> >> >>> >> >> >> >>> >>> >> >> >>> >>> Are you running the Flink cluster in batch or streaming mode? >> >> >>> >>> >> >> >>> >> Started in streaming mode. Running with two nodes. In the >> cluster. >> >> >>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC" >> due >> >> >>> >> to >> >> >>> >> some >> >> >>> >> strange java core dumps in the G1 GC. >> >> >>> >> >> >> >>> >>> >> >> >>> >>> Direct memory is used by Flink's network layer. My guess is >> that >> >> >>> >>> you >> >> >>> >>> have set taskmanager.heap.mb too low (it constraints the >> number of >> >> >>> >>> direct memory at the moment). >> >> >>> >>> >> >> >>> >>> Thank you, >> >> >>> >>> Max >> >> >>> >>> >> >> >>> >>> >> >> >>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson >> >> >>> >>> <jakob.erics...@gmail.com> wrote: >> >> >>> >>> > Hello, >> >> >>> >>> > >> >> >>> >>> > We are running into a strange problem with Direct Memory >> >> >>> >>> > buffers. >> >> >>> >>> > From >> >> >>> >>> > what >> >> >>> >>> > I know, we are not using any direct memory buffers inside our >> >> >>> >>> > code. >> >> >>> >>> > This is pretty trivial streaming application just doing some >> >> >>> >>> > dedupliction >> >> >>> >>> > and union some kafka streams. >> >> >>> >>> > >> >> >>> >>> > /Jakob >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > 2015-10-19 13:27:59,064 INFO >> >> >>> >>> > org.apache.flink.runtime.taskmanager.Task >> >> >>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to >> >> >>> >>> > FAILED >> >> >>> >>> > with >> >> >>> >>> > exception. >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >> >> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> >> >>> >>> > at >> >> >>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) >> >> >>> >>> > at java.lang.Thread.run(Thread.java:745) >> >> >>> >>> > Caused by: io.netty.handler.codec.DecoderException: >> >> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) >> >> >>> >>> > ... 9 more >> >> >>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory >> >> >>> >>> > at java.nio.Bits.reserveMemory(Bits.java:658) >> >> >>> >>> > at >> >> >>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) >> >> >>> >>> > at >> >> >>> >>> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651) >> >> >>> >>> > at >> >> >>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237) >> >> >>> >>> > at >> >> >>> >>> > io.netty.buffer.PoolArena.allocate(PoolArena.java:215) >> >> >>> >>> > at >> >> >>> >>> > io.netty.buffer.PoolArena.reallocate(PoolArena.java:358) >> >> >>> >>> > at >> >> >>> >>> > >> io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) >> >> >>> >>> > at >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> >> >>> >>> > >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228) >> >> >>> >>> > ... 10 more >> >> >>> >>> > >> >> >>> >> >> >> >>> >> >> > >> > >> > >