Hi,

We are using standalone flink at the moment.
Our snapshot build comes from src. I removed everything from old build
before I started the server. There shouldn't be any traces from the old
stuff left.

I bumped the process to 20GB and has been running stabile and also running
G1.
There might still be some kind of memory leak that I don't see atm due to
the large heap.

This is how the taskmanager process looks right now.

java -XX:+UseG1GC -Xms20480M -Xmx20480M -XX:MaxDirectMemorySize=20480M
-Dlog.file=/appsdir/flink-0.10-SNAPSHOT/log/flink-splat-taskmanager-0-fbweb608.log
-Dlog4j.configuration=file:/appsdir/flink-0.10-SNAPSHOT/conf/log4j.properties
-Dlogback.configurationFile=file:/appsdir/flink-0.10-SNAPSHOT/conf/logback.xml
-classpath
/appsdir/flink-0.10-SNAPSHOT/lib/flink-dist-0.10-SNAPSHOT.jar:/appsdir/flink-0.10-SNAPSHOT/lib/flink-python-0.10-SNAPSHOT.jar:/appsdir/flink-0.10-SNAPSHOT/lib/log4j-1.2.17.jar:/appsdir/flink-0.10-SNAPSHOT/lib/slf4j-log4j12-1.7.7.jar:::
org.apache.flink.runtime.taskmanager.TaskManager --configDir
/appsdir/flink-0.10-SNAPSHOT/conf --streamingMode streaming



/Jakob

On Tue, Oct 20, 2015 at 3:38 PM, Stephan Ewen <se...@apache.org> wrote:

> @Jakob: If you use Flink standalone (not through YARN), one thing to be
> aware of is that the relevant change is in the bash scripts that start the
> cluster, not the code. If you upgraded Flink by copying a newer JAR file,
> you missed the update of the bash scripts and missed the fix for that issue.
>
> On Tue, Oct 20, 2015 at 10:39 AM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Hi Jakob,
>>
>> Your revision number is fairly new and your direct memory
>> configuration seems to be correct for your setup. If you have the
>> time, you could verify that the memory flags for the JVM are set
>> correctly by the startup script. You can see that in the first lines
>> of the task manager log. If the direct memory was set to 2GB with the
>> default number of network buffers, the JVM should have had enough
>> direct memory. Still, we'd like to find out what caused your problem.
>>
>> Are you running on YARN or standalone?
>>
>> Yes, the usual setup is one task manager per host/VM. The task manager
>> will allocate all memory upfront. However, a large part of this memory
>> will be self-managed by Flink and not touched much by the GC. By
>> default, this is 0.7 of the configured heap memory. You can control
>> this ratio with the taskmanager.memory.fraction variable. You can also
>> set a fixed managed memory size using taskmanager.memory.size (MB). In
>> large memory setups, we have seen a slightly better performance using
>> off-heap memory allocation. This can be configured using
>> taskmanager.memory.off-heap: true.
>>
>> Please let us know if you experience any further issues.
>>
>> Best,
>> Max
>>
>> On Mon, Oct 19, 2015 at 10:14 PM, Jakob Ericsson
>> <jakob.erics...@gmail.com> wrote:
>> > The revision is "Starting JobManager (Version: 0.10-SNAPSHOT,
>> Rev:c82ebbf,
>> > Date:15.10.2015 @ 11:34:01 CEST)"
>> >
>> > We have a lot of memory left on the machine. I have increased it quite a
>> > lot.
>> >
>> > What is your thought on memory configuration?
>> > If I understand Flink correctly, you should only have one taskmanager
>> > running each host?
>> >
>> > For a pretty standard machine with 16 cores and 32-64 GB memory. This
>> means
>> > that you will have one java process running with a Xmx30G or even
>> higher for
>> > exhausting all memory of the machine. This is, at least for the CMS GC,
>> not
>> > the most optimal configuration.
>> > It might be viable for G1 but we got some really serious java core dumps
>> > when running G1.
>> >
>> > I looked a bit on the flags that was set on the process and it seems
>> that
>> > Xmx and MaxDirectMemorySize are set to the same value by the shell
>> script.
>> > When I got the "java.lang.OutOfMemoryError: Direct buffer memory", I was
>> > running with a taskmanager.heap.mb:2048. So the direct memory buffer
>> was set
>> > to 2GB.
>> >
>> > I have restarted the process with G1 again and 20GB as
>> taskmanager.heap.mb.
>> > Lets see if it will be stable during the night.
>> >
>> >
>> > On Mon, Oct 19, 2015 at 6:31 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>> >>
>> >> You can see the revision number and the build date in the JobManager
>> >> log file, e.g. "Starting JobManager (Version: 0.10-SNAPSHOT,
>> >> Rev:1b79bc1, Date:18.10.2015 @ 20:15:08 CEST)"
>> >>
>> >> On Mon, Oct 19, 2015 at 5:53 PM, Maximilian Michels <m...@apache.org>
>> >> wrote:
>> >> > When was the last time you updated your 0.10-SNAPSHOT Flink cluster?
>> >> > If it has been more than a couple of weeks, then I'd advise you to
>> >> > update to the latest snapshot version. There has been an issue with
>> >> > the calculation of the off-heap memory limit in the past.
>> >> >
>> >> > Thanks,
>> >> > Max
>> >> >
>> >> > On Mon, Oct 19, 2015 at 5:26 PM, Gyula Fóra <gyula.f...@gmail.com>
>> >> > wrote:
>> >> >> It's 0.10-SNAPSHOT
>> >> >>
>> >> >> Gyula
>> >> >>
>> >> >> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2015. okt.
>> 19.,
>> >> >> H,
>> >> >> 17:13):
>> >> >>>
>> >> >>> I forgot to ask you: Which version of Flink are you using? 0.9.1 or
>> >> >>> 0.10-SNAPSHOT?
>> >> >>>
>> >> >>> On Mon, Oct 19, 2015 at 5:05 PM, Maximilian Michels <
>> m...@apache.org>
>> >> >>> wrote:
>> >> >>> > Hi Jakob,
>> >> >>> >
>> >> >>> > Thanks. Flink allocates its network memory as direct memory
>> outside
>> >> >>> > the normal Java heap. By default, that is 64MB but can grow up to
>> >> >>> > 128MB on heavy network transfer. How much memory does your
>> machine
>> >> >>> > have? Could it be that your upper memory bound is lower than
>> 2048 +
>> >> >>> > 128 MB?
>> >> >>> >
>> >> >>> > Best,
>> >> >>> > Max
>> >> >>> >
>> >> >>> > On Mon, Oct 19, 2015 at 4:32 PM, Jakob Ericsson
>> >> >>> > <jakob.erics...@gmail.com> wrote:
>> >> >>> >> Hi,
>> >> >>> >>
>> >> >>> >> See answers below.
>> >> >>> >>
>> >> >>> >> /Jakob
>> >> >>> >>
>> >> >>> >> On Mon, Oct 19, 2015 at 4:03 PM, Maximilian Michels
>> >> >>> >> <m...@apache.org>
>> >> >>> >> wrote:
>> >> >>> >>>
>> >> >>> >>> Hi Jakob,
>> >> >>> >>>
>> >> >>> >>> Thank you for reporting the bug. Could you please post your
>> >> >>> >>> configuration here? In particular, could you please tell us the
>> >> >>> >>> value
>> >> >>> >>> of the following configuration variables:
>> >> >>> >>>
>> >> >>> >>> taskmanager.heap.mb
>> >> >>> >>
>> >> >>> >> taskmanager.heap.mb: 2048
>> >> >>> >>>
>> >> >>> >>> taskmanager.network.numberOfBuffers
>> >> >>> >>
>> >> >>> >>
>> >> >>> >> Default value. Not changed.
>> >> >>> >>
>> >> >>> >>>
>> >> >>> >>> taskmanager.memory.off-heap
>> >> >>> >>>
>> >> >>> >> Default value Not changed.
>> >> >>> >>
>> >> >>> >>>
>> >> >>> >>> Are you running the Flink cluster in batch or streaming mode?
>> >> >>> >>>
>> >> >>> >> Started in streaming mode. Running with two nodes. In the
>> cluster.
>> >> >>> >> Also, I have set the "env.java.opts: -XX:+UseConcMarkSweepGC"
>> due
>> >> >>> >> to
>> >> >>> >> some
>> >> >>> >> strange java core dumps in the G1 GC.
>> >> >>> >>
>> >> >>> >>>
>> >> >>> >>> Direct memory is used by Flink's network layer. My guess is
>> that
>> >> >>> >>> you
>> >> >>> >>> have set taskmanager.heap.mb too low (it constraints the
>> number of
>> >> >>> >>> direct memory at the moment).
>> >> >>> >>>
>> >> >>> >>> Thank you,
>> >> >>> >>> Max
>> >> >>> >>>
>> >> >>> >>>
>> >> >>> >>> On Mon, Oct 19, 2015 at 3:24 PM, Jakob Ericsson
>> >> >>> >>> <jakob.erics...@gmail.com> wrote:
>> >> >>> >>> > Hello,
>> >> >>> >>> >
>> >> >>> >>> > We are running into a strange problem with Direct Memory
>> >> >>> >>> > buffers.
>> >> >>> >>> > From
>> >> >>> >>> > what
>> >> >>> >>> > I know, we are not using any direct memory buffers inside our
>> >> >>> >>> > code.
>> >> >>> >>> > This is pretty trivial streaming application just doing some
>> >> >>> >>> > dedupliction
>> >> >>> >>> > and union some kafka streams.
>> >> >>> >>> >
>> >> >>> >>> > /Jakob
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> > 2015-10-19 13:27:59,064 INFO
>> >> >>> >>> > org.apache.flink.runtime.taskmanager.Task
>> >> >>> >>> > - FilterAndTransform -> (Filter, Filter) (3/4) switched to
>> >> >>> >>> > FAILED
>> >> >>> >>> > with
>> >> >>> >>> > exception.
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> >> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> >> >>> >>> >         at
>> >> >>> >>> > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>> >> >>> >>> >         at java.lang.Thread.run(Thread.java:745)
>> >> >>> >>> > Caused by: io.netty.handler.codec.DecoderException:
>> >> >>> >>> > java.lang.OutOfMemoryError: Direct buffer memory
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> >> >>> >>> >         ... 9 more
>> >> >>> >>> > Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>> >> >>> >>> >         at java.nio.Bits.reserveMemory(Bits.java:658)
>> >> >>> >>> >         at
>> >> >>> >>> > java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
>> >> >>> >>> >         at
>> >> >>> >>> > java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
>> >> >>> >>> >         at
>> >> >>> >>> > io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
>> >> >>> >>> >         at
>> >> >>> >>> > io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>> >> >>> >>> >         at
>> >> >>> >>> > io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
>> >> >>> >>> >         at
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> >> >>> >>> >
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)
>> >> >>> >>> >         ... 10 more
>> >> >>> >>> >
>> >> >>> >>
>> >> >>> >>
>> >
>> >
>>
>
>
  • Re: Maximilian Michels
    • Re: Jakob Ericsson
      • Re: Maximilian Michels
        • Re: Maximilian Michels
          • Re: Gyula Fóra
            • Re: Maximilian Michels
              • Re: Maximilian Michels
              • Re: Jakob Ericsson
              • Re: Maximilian Michels
              • Re: Stephan Ewen
              • Re: Jakob Ericsson

Reply via email to