Hi Alex,
I suspect its a GC issue with the code generated by ScalaBuff. Can you
maybe try to do something like a standalone test where use use a
while(true) loop to see how fast you can deserialize elements from your Foo
type?
Maybe you'll find that the JVM is growing all the time. Then there's
probably a memory leak somewhere.

On Tue, Apr 19, 2016 at 11:42 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Alex,
>
> (1) Which Flink version are you using for this?
>
> (2) Can you also get a heap dump after the job slows down? Slow downs
> like this are often caused by some component leaking memory, maybe in
> Flink, maybe the Scalabuff deserializer. Can you also share the Foo
> code?
>
> – Ufuk
>
>
> On Mon, Apr 18, 2016 at 4:36 PM, Alexander Gryzlov
> <alex.gryz...@gmail.com> wrote:
> > Hello,
> >
> >
> >
> > Has anyone tried using ScalaBuff
> > (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to
> > consume Protobuf messages from Kafka 0.8 and have hit a performance
> issue.
> > We run this code:
> >
> >
> >
> > https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where
> Foo
> > is pre-generated by ScalaBuff compiler)
> >
> >
> >
> > and get these numbers (whereas the topic produces 20K msg/sec on the
> > average):
> >
> > During the last 153 ms, we received 997 elements. That's
> 6516.339869281046
> > elements/second/core.
> > During the last 214 ms, we received 998 elements. That's
> 4663.551401869159
> > elements/second/core.
> > During the last 223 ms, we received 1000 elements. That's
> 4484.304932735426
> > elements/second/core.
> > During the last 282 ms, we received 1000 elements. That's
> 3546.0992907801415
> > elements/second/core.
> > During the last 378 ms, we received 1001 elements. That's
> 2648.1481481481483
> > elements/second/core.
> > During the last 544 ms, we received 999 elements. That's
> 1836.3970588235293
> > elements/second/core.
> > During the last 434 ms, we received 999 elements. That's 2301.84331797235
> > elements/second/core.
> > During the last 432 ms, we received 1000 elements. That's
> 2314.814814814815
> > elements/second/core.
> > During the last 400 ms, we received 991 elements. That's 2477.5
> > elements/second/core.
> > During the last 296 ms, we received 998 elements. That's
> 3371.6216216216217
> > elements/second/core.
> > During the last 561 ms, we received 1000 elements. That's
> 1782.5311942959001
> > elements/second/core.
> >
> > ...
> >
> >
> >
> > The number/sec/core keeps falling until it stabilizes at ~5-10 elem/sec
> > after a few hours.
> >
> >
> >
> > Looking with JMX at the app gets us (first number is RUNNING, second is
> > MONITOR):
> >
> > SimpleConsumer - Source: Custom Source -> Flat Map - broker-1  10.8%
> 89.1%
> > SimpleConsumer - Source: Custom Source -> Flat Map - broker-2  14.0%
> 85.9%
> > SimpleConsumer - Source: Custom Source -> Flat Map - broker-3  13.6%
> 86.3%
> > SimpleConsumer - Source: Custom Source -> Flat Map - broker-4  12.6%
> 87.3%
> >
> > SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2%
> 87.7%
> > SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6%
> 84.3%
> > SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6%
> 88.3%
> > SimpleConsumer - Source: Custom Source -> Flat Map - broker-13  9.7%
> 90.2%
> >
> > If the schema is modified to simply return an Array[Byte], we get a
> proper
> > speed of ~20K/sec and RUNNING is 100% on all broker threads.
> >
> >
> >
> > From a thread dump, it’s clear that only a single consumer thread works
> at a
> > time, while the rest are locked by sourceContext.getCheckpointLock() at
> >
> https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663
> >
> > Alex
>

Reply via email to