Hello Flink community, I wanted to report a performance regression our team is observing between FlinkKinesisConsumer and the new KinesisStreamsSource API’s when using EFO consumer type.
We have a stateless app that consumes from kinesis, does some filtering, JSON to Avro serialization, and writes out to a Kafka Sink. Performance Profile - Kinesis Stream with 960 shards and 240 mb/s rate - Flink 1.19 - 12 TM c8g instance type - 16 cores/32 GB memory/8 task slots Resource utilization with KinesisStreamsSource is around 14 CPU, but with FlinkKinesisConsumer it's 3-4 CPU. Around 4x regression. Flame chart shows that the majority of the CPU time is spent on unlocking the elementQueue. 59% - java.util.concurrent.locks.ReentrantLock.unlock:-1 59% - org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.poll:259 64% - org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch:231 65% - org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext:190 We suspected that the high number of threads in KinesisAsyncClient was causing this lock contention so we tried reducing the clients' thread pool size and increasing the elementQueue size, but that only increased CPU usage. We also ran a test deployment, splitting the KinesisStreamsSource into its own dedicated operator chain, where it continued to dominate the CPU time. I’d like to get some feedback on whether there are any other configuration options worth pursuing, or if this warrants a new JIRA. -- Appreciate your input, Lukasz Krawiec