Hi Alex, I suspect its a GC issue with the code generated by ScalaBuff. Can you maybe try to do something like a standalone test where use use a while(true) loop to see how fast you can deserialize elements from your Foo type? Maybe you'll find that the JVM is growing all the time. Then there's probably a memory leak somewhere.
On Tue, Apr 19, 2016 at 11:42 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Alex, > > (1) Which Flink version are you using for this? > > (2) Can you also get a heap dump after the job slows down? Slow downs > like this are often caused by some component leaking memory, maybe in > Flink, maybe the Scalabuff deserializer. Can you also share the Foo > code? > > – Ufuk > > > On Mon, Apr 18, 2016 at 4:36 PM, Alexander Gryzlov > <alex.gryz...@gmail.com> wrote: > > Hello, > > > > > > > > Has anyone tried using ScalaBuff > > (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to > > consume Protobuf messages from Kafka 0.8 and have hit a performance > issue. > > We run this code: > > > > > > > > https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where > Foo > > is pre-generated by ScalaBuff compiler) > > > > > > > > and get these numbers (whereas the topic produces 20K msg/sec on the > > average): > > > > During the last 153 ms, we received 997 elements. That's > 6516.339869281046 > > elements/second/core. > > During the last 214 ms, we received 998 elements. That's > 4663.551401869159 > > elements/second/core. > > During the last 223 ms, we received 1000 elements. That's > 4484.304932735426 > > elements/second/core. > > During the last 282 ms, we received 1000 elements. That's > 3546.0992907801415 > > elements/second/core. > > During the last 378 ms, we received 1001 elements. That's > 2648.1481481481483 > > elements/second/core. > > During the last 544 ms, we received 999 elements. That's > 1836.3970588235293 > > elements/second/core. > > During the last 434 ms, we received 999 elements. That's 2301.84331797235 > > elements/second/core. > > During the last 432 ms, we received 1000 elements. That's > 2314.814814814815 > > elements/second/core. > > During the last 400 ms, we received 991 elements. That's 2477.5 > > elements/second/core. > > During the last 296 ms, we received 998 elements. That's > 3371.6216216216217 > > elements/second/core. > > During the last 561 ms, we received 1000 elements. That's > 1782.5311942959001 > > elements/second/core. > > > > ... > > > > > > > > The number/sec/core keeps falling until it stabilizes at ~5-10 elem/sec > > after a few hours. > > > > > > > > Looking with JMX at the app gets us (first number is RUNNING, second is > > MONITOR): > > > > SimpleConsumer - Source: Custom Source -> Flat Map - broker-1 10.8% > 89.1% > > SimpleConsumer - Source: Custom Source -> Flat Map - broker-2 14.0% > 85.9% > > SimpleConsumer - Source: Custom Source -> Flat Map - broker-3 13.6% > 86.3% > > SimpleConsumer - Source: Custom Source -> Flat Map - broker-4 12.6% > 87.3% > > > > SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2% > 87.7% > > SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6% > 84.3% > > SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6% > 88.3% > > SimpleConsumer - Source: Custom Source -> Flat Map - broker-13 9.7% > 90.2% > > > > If the schema is modified to simply return an Array[Byte], we get a > proper > > speed of ~20K/sec and RUNNING is 100% on all broker threads. > > > > > > > > From a thread dump, it’s clear that only a single consumer thread works > at a > > time, while the rest are locked by sourceContext.getCheckpointLock() at > > > https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663 > > > > Alex >