Hi Konstantin,

> could you replace the Kafka Source by a custom SourceFunction-implementation, 
> which just produces the new events in a loop as fast as possible. This way we 
> can rule out that the ingestion is responsible for the performance jump or 
> the limit at 5000 events/s and can benchmark the Flink job separately.

We built a custom source function and figured out the reason for the
sudden performance jump. It was caused by the wrong watermarks in the
original Kafka stream. Certain events created watermarks in the
future. As soon as that happened, all subsequent events were dropped.
Hence the throughput increase.

> Kryo is much less efficient than Flinks POJO serializer. In the logs you 
> should see INFO logs about the classes for which Flink falls back to Kryo. 
> Try to replace those by Flink POJO, i.e. default constructor and public 
> getters and setters. As Flink also needs to serialize/deserialize each state 
> object for reading and writing this also applies to your state classes, not 
> only to your events. The lines you are looking for are "INFO  
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected 
> for class com.company.YourClass so it cannot be used as a POJO type and must 
> be processed as GenericType. Please read the Flink documentation on "Data 
> Types & Serialization" for details of the effect on performance.". You can 
> disable the Kryo fallback temporarily as descried in [1]. I would suspect 
> that serialization is a big factor right now, in particular if you see Kryo 
> methods taking a lot of time.

This was very helpful. We found a couple of key objects being
serialized using Kryo. After fixing them and making sure that they
were properly serialized as POJOs, the performance almost doubled from
500 events/s.

Thank you a lot for the advices,

Ning

Reply via email to