Hi Konstantin, > could you replace the Kafka Source by a custom SourceFunction-implementation, > which just produces the new events in a loop as fast as possible. This way we > can rule out that the ingestion is responsible for the performance jump or > the limit at 5000 events/s and can benchmark the Flink job separately.
We built a custom source function and figured out the reason for the sudden performance jump. It was caused by the wrong watermarks in the original Kafka stream. Certain events created watermarks in the future. As soon as that happened, all subsequent events were dropped. Hence the throughput increase. > Kryo is much less efficient than Flinks POJO serializer. In the logs you > should see INFO logs about the classes for which Flink falls back to Kryo. > Try to replace those by Flink POJO, i.e. default constructor and public > getters and setters. As Flink also needs to serialize/deserialize each state > object for reading and writing this also applies to your state classes, not > only to your events. The lines you are looking for are "INFO > org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected > for class com.company.YourClass so it cannot be used as a POJO type and must > be processed as GenericType. Please read the Flink documentation on "Data > Types & Serialization" for details of the effect on performance.". You can > disable the Kryo fallback temporarily as descried in [1]. I would suspect > that serialization is a big factor right now, in particular if you see Kryo > methods taking a lot of time. This was very helpful. We found a couple of key objects being serialized using Kryo. After fixing them and making sure that they were properly serialized as POJOs, the performance almost doubled from 500 events/s. Thank you a lot for the advices, Ning