The application consumes from a single Kafka topic, deserializes the JSON payload into POJOs and use a big keyed window (30+ days) for deduplication, then emits the result for every single event to four other keyed windows for aggregation. It looks roughly like the following.
Source->KeyBy(A,B,C) | | -->KeyBy(A,B,C)->Hourly Window(sum) v |->KeyBy(A,B,C)->Daily Window(sum) Big Window(sum, emit per event) -| |->KeyBy(D,E)->Hourly Window(sum) -->KeyBy(D,E)->Daily Window(sum) The cardinality for the window keyed on (A,B,C) is high, could be in the millions. The values (A,B,C) are all strings. I'm doing performance testing by letting the application consuming the past 7 days data from Kafka. However, the performance is not good and I'm having some trouble interpreting the results. All tests were done on AWS using i3.xlarge with 2 slots per TM. This was tested with one, three, and six TMs. Parallelism was set to the same as the total number of slots available, e.g. 6 for 3 nodes with 2 slots per TM. - The application would always start at consuming ~500 messages/s from Kafka for about 20 - 30 minutes, then jump to ~5,000 messages/s. I noticed that the disk I/O would reduce noticeable when the performance jumped. - Regardless of the number of TMs used, it always peaked at ~5,000 messages/s and had the same behavior as described above. - In the Flink UI, it always shows that the Source was back pressured by the Big window when the performance was at ~500 messages/s, and no back pressure at all once the performance reaches ~5,000 messages/s. - I took some Flight Recorder recordings and it showed that the time trigger Big window thread was always doing SystemProcessingTimeService$TriggerTask.run(). Since I'm only triggering the Big window by count of events, why would this be running? - Flight Recorder also showed that the Big window thread was either doing RocksDB writes or gets most of the time when the performance was low. I understand that it keeps the states in RocksDB, but I wasn't expecting it to tank the performance like this. - Flight Recorder showed that the hottest methods were all about Kryo serialization. - GC was ok, nothing longer than 20ms and there weren't a lot of them. My questions are - Why is the performance so bad and why didn't it scale as I increase the number of TMs. - Why would the performance jump suddenly after 20 minutes or so? - I know the JSON and POJO serialization is not great. Could it be this bad? Any insights or guidance on how I can diagnose the issue further will be greatly appreciated. Thanks, Ning