Hello Flink community,

I wanted to report a performance regression our team is observing between
FlinkKinesisConsumer and the new KinesisStreamsSource API’s when using EFO
consumer type.

We have a stateless app that consumes from kinesis, does some filtering,
JSON to Avro serialization, and writes out to a Kafka Sink.

Performance Profile

   - Kinesis Stream with 960 shards and 240 mb/s rate
   - Flink 1.19
   - 12 TM c8g instance type
   - 16 cores/32 GB memory/8 task slots

Resource utilization with KinesisStreamsSource is around 14 CPU, but with
FlinkKinesisConsumer it's 3-4 CPU. Around 4x regression.

Flame chart shows that the majority of the CPU time is spent on unlocking
the elementQueue.

59% - java.util.concurrent.locks.ReentrantLock.unlock:-1
59% -
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.poll:259
64% -
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch:231
65% -
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext:190

We suspected that the high number of threads in KinesisAsyncClient was
causing this lock contention so we tried reducing the clients' thread pool
size and increasing the elementQueue size, but that only increased CPU
usage.

We also ran a test deployment, splitting the KinesisStreamsSource into its
own dedicated operator chain, where it continued to dominate the CPU time.

I’d like to get some feedback on whether there are any other configuration
options worth pursuing, or if this warrants a new JIRA.

-- 
Appreciate your input,
Lukasz Krawiec

Reply via email to