Hey Alex,

(1) Which Flink version are you using for this?

(2) Can you also get a heap dump after the job slows down? Slow downs
like this are often caused by some component leaking memory, maybe in
Flink, maybe the Scalabuff deserializer. Can you also share the Foo
code?

– Ufuk


On Mon, Apr 18, 2016 at 4:36 PM, Alexander Gryzlov
<alex.gryz...@gmail.com> wrote:
> Hello,
>
>
>
> Has anyone tried using ScalaBuff
> (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to
> consume Protobuf messages from Kafka 0.8 and have hit a performance issue.
> We run this code:
>
>
>
> https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where Foo
> is pre-generated by ScalaBuff compiler)
>
>
>
> and get these numbers (whereas the topic produces 20K msg/sec on the
> average):
>
> During the last 153 ms, we received 997 elements. That's 6516.339869281046
> elements/second/core.
> During the last 214 ms, we received 998 elements. That's 4663.551401869159
> elements/second/core.
> During the last 223 ms, we received 1000 elements. That's 4484.304932735426
> elements/second/core.
> During the last 282 ms, we received 1000 elements. That's 3546.0992907801415
> elements/second/core.
> During the last 378 ms, we received 1001 elements. That's 2648.1481481481483
> elements/second/core.
> During the last 544 ms, we received 999 elements. That's 1836.3970588235293
> elements/second/core.
> During the last 434 ms, we received 999 elements. That's 2301.84331797235
> elements/second/core.
> During the last 432 ms, we received 1000 elements. That's 2314.814814814815
> elements/second/core.
> During the last 400 ms, we received 991 elements. That's 2477.5
> elements/second/core.
> During the last 296 ms, we received 998 elements. That's 3371.6216216216217
> elements/second/core.
> During the last 561 ms, we received 1000 elements. That's 1782.5311942959001
> elements/second/core.
>
> ...
>
>
>
> The number/sec/core keeps falling until it stabilizes at ~5-10 elem/sec
> after a few hours.
>
>
>
> Looking with JMX at the app gets us (first number is RUNNING, second is
> MONITOR):
>
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-1  10.8% 89.1%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-2  14.0% 85.9%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-3  13.6% 86.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-4  12.6% 87.3%
>
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2% 87.7%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6% 84.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6% 88.3%
> SimpleConsumer - Source: Custom Source -> Flat Map - broker-13  9.7% 90.2%
>
> If the schema is modified to simply return an Array[Byte], we get a proper
> speed of ~20K/sec and RUNNING is 100% on all broker threads.
>
>
>
> From a thread dump, it’s clear that only a single consumer thread works at a
> time, while the rest are locked by sourceContext.getCheckpointLock() at
> https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663
>
> Alex

Reply via email to