Hello,
Has anyone tried using ScalaBuff (https://github.com/SandroGrzicic/ScalaBuff) with Flink? We’re trying to consume Protobuf messages from Kafka 0.8 and have hit a performance issue. We run this code: https://gist.github.com/clayrat/05ac17523fcaa52fcc5165d9edb406b8 (where Foo is pre-generated by ScalaBuff compiler) and get these numbers (whereas the topic produces 20K msg/sec on the average): During the last 153 ms, we received 997 elements. That's 6516.339869281046 elements/second/core. During the last 214 ms, we received 998 elements. That's 4663.551401869159 elements/second/core. During the last 223 ms, we received 1000 elements. That's 4484.304932735426 elements/second/core. During the last 282 ms, we received 1000 elements. That's 3546.0992907801415 elements/second/core. During the last 378 ms, we received 1001 elements. That's 2648.1481481481483 elements/second/core. During the last 544 ms, we received 999 elements. That's 1836.3970588235293 elements/second/core. During the last 434 ms, we received 999 elements. That's 2301.84331797235 elements/second/core. During the last 432 ms, we received 1000 elements. That's 2314.814814814815 elements/second/core. During the last 400 ms, we received 991 elements. That's 2477.5 elements/second/core. During the last 296 ms, we received 998 elements. That's 3371.6216216216217 elements/second/core. During the last 561 ms, we received 1000 elements. That's 1782.5311942959001 elements/second/core. ... The number/sec/core keeps falling until it stabilizes at ~5-10 elem/sec after a few hours. Looking with JMX at the app gets us (first number is RUNNING, second is MONITOR): SimpleConsumer - Source: Custom Source -> Flat Map - broker-1 10.8% 89.1% SimpleConsumer - Source: Custom Source -> Flat Map - broker-2 14.0% 85.9% SimpleConsumer - Source: Custom Source -> Flat Map - broker-3 13.6% 86.3% SimpleConsumer - Source: Custom Source -> Flat Map - broker-4 12.6% 87.3% SimpleConsumer - Source: Custom Source -> Flat Map - broker-10 12.2% 87.7% SimpleConsumer - Source: Custom Source -> Flat Map - broker-11 15.6% 84.3% SimpleConsumer - Source: Custom Source -> Flat Map - broker-12 11.6% 88.3% SimpleConsumer - Source: Custom Source -> Flat Map - broker-13 9.7% 90.2% If the schema is modified to simply return an Array[Byte], we get a proper speed of ~20K/sec and RUNNING is 100% on all broker threads. >From a thread dump, it’s clear that only a single consumer thread works at a time, while the rest are locked by sourceContext.getCheckpointLock() at https://github.com/apache/flink/blob/release-1.0/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java#L663 Alex