Hi Ning, could you replace the Kafka Source by a custom SourceFunction-implementation, which just produces the new events in a loop as fast as possible. This way we can rule out that the ingestion is responsible for the performance jump or the limit at 5000 events/s and can benchmark the Flink job separately.
Kryo is much less efficient than Flinks POJO serializer. In the logs you should see INFO logs about the classes for which Flink falls back to Kryo. Try to replace those by Flink POJO, i.e. default constructor and public getters and setters. As Flink also needs to serialize/deserialize each state object for reading and writing this also applies to your state classes, not only to your events. The lines you are looking for are "INFO org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for class com.company.YourClass so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.". You can disable the Kryo fallback temporarily as descried in [1]. I would suspect that serialization is a big factor right now, in particular if you see Kryo methods taking a lot of time. Best, Konstantin [1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#disabling-kryo-fallback