The application consumes from a single Kafka topic, deserializes the
JSON payload into POJOs and use a big keyed window (30+ days) for
deduplication, then emits the result for every single event to four
other keyed windows for aggregation. It looks roughly like the
following.

Source->KeyBy(A,B,C)
           |
           |                    -->KeyBy(A,B,C)->Hourly Window(sum)
           v                    |->KeyBy(A,B,C)->Daily Window(sum)
Big Window(sum, emit per event) -|
                                |->KeyBy(D,E)->Hourly Window(sum)
                                -->KeyBy(D,E)->Daily Window(sum)

The cardinality for the window keyed on (A,B,C) is high, could be in the
millions. The values (A,B,C) are all strings.

I'm doing performance testing by letting the application consuming the
past 7 days data from Kafka. However, the performance is not good and
I'm having some trouble interpreting the results. All tests were done on
AWS using i3.xlarge with 2 slots per TM. This was tested with one,
three, and six TMs. Parallelism was set to the same as the total number
of slots available, e.g. 6 for 3 nodes with 2 slots per TM.

- The application would always start at consuming ~500 messages/s from
  Kafka for about 20 - 30 minutes, then jump to ~5,000 messages/s. I
  noticed that the disk I/O would reduce noticeable when the performance
  jumped.

- Regardless of the number of TMs used, it always peaked at ~5,000
  messages/s and had the same behavior as described above.

- In the Flink UI, it always shows that the Source was back pressured by
  the Big window when the performance was at ~500 messages/s, and no
  back pressure at all once the performance reaches ~5,000 messages/s.

- I took some Flight Recorder recordings and it showed that the time
  trigger Big window thread was always doing
  SystemProcessingTimeService$TriggerTask.run(). Since I'm only
  triggering the Big window by count of events, why would this be
  running?

- Flight Recorder also showed that the Big window thread was either
  doing RocksDB writes or gets most of the time when the performance was
  low. I understand that it keeps the states in RocksDB, but I wasn't
  expecting it to tank the performance like this.

- Flight Recorder showed that the hottest methods were all about Kryo
  serialization.

- GC was ok, nothing longer than 20ms and there weren't a lot of them.

My questions are

- Why is the performance so bad and why didn't it scale as I increase
  the number of TMs.

- Why would the performance jump suddenly after 20 minutes or so?

- I know the JSON and POJO serialization is not great. Could it be this
  bad?

Any insights or guidance on how I can diagnose the issue further will be
greatly appreciated.

Thanks,

Ning

Reply via email to