Hello,


Has anyone tried using ScalaBuff (https://github.com/SandroGrzicic/ScalaBuff)
with Flink? We’re trying to consume Protobuf messages from Kafka 0.8 and
have hit a performance issue. We run this code:



https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where Foo
is pre-generated by ScalaBuff compiler)



and get these numbers (whereas the topic produces 20K msg/sec on the
average):

During the last 153 ms, we received 997 elements. That's 6516.339869281046
elements/second/core.
During the last 214 ms, we received 998 elements. That's 4663.551401869159
elements/second/core.
During the last 223 ms, we received 1000 elements. That's 4484.304932735426
elements/second/core.
During the last 282 ms, we received 1000 elements. That's
3546.0992907801415 elements/second/core.
During the last 378 ms, we received 1001 elements. That's
2648.1481481481483 elements/second/core.
During the last 544 ms, we received 999 elements. That's 1836.3970588235293
elements/second/core.
During the last 434 ms, we received 999 elements. That's 2301.84331797235
elements/second/core.
During the last 432 ms, we received 1000 elements. That's 2314.814814814815
elements/second/core.
During the last 400 ms, we received 991 elements. That's 2477.5
elements/second/core.
During the last 296 ms, we received 998 elements. That's 3371.6216216216217
elements/second/core.
During the last 561 ms, we received 1000 elements. That's
1782.5311942959001 elements/second/core.

...



The number/sec/core keeps falling until it stabilizes at ~5-10
elem/sec after a few hours.



Looking with JMX at the app gets us (first number is RUNNING, second is
MONITOR):

SimpleConsumer - Source: Custom Source -> Flat Map - broker-1  10.8% 89.1%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-2  14.0% 85.9%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-3  13.6% 86.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-4  12.6% 87.3%

SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2% 87.7%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6% 84.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6% 88.3%
SimpleConsumer - Source: Custom Source -> Flat Map - broker-13  9.7% 90.2%
​

If the schema is modified to simply return an Array[Byte], we get a proper
speed of ~20K/sec and RUNNING is 100% on all broker threads.


>From a thread dump, it’s clear that only a single consumer thread works at
a time, while the rest are locked by sourceContext.getCheckpointLock() at
https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663

Alex

Reply via email to