Hi, I’m trying to understand the performance impact of using GroupByKey in a pipeline running with flink runner.
I compared two simple pipelines: Pipeline1: read from kafka -> DoFn { once in a second write to log the number of arrived messages } Pipeline2: read from kafka -> global window + GroupByKey -> DoFn { once in a second write to log the number of arrived messages } This is the GroupByKey command: PCollection<KV<String, Iterable<MyClass>>> shuffledInput = origInput .apply("groupByKey", Window.<KV<String, MyClass>>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .withAllowedLateness(Duration.ZERO) //late messages are dropped .discardingFiredPanes()) .apply(GroupByKey.create()); Configuration: Parallelism=3, 1 task slot per task manager (results in 3 task managers) fasterCopy=true metrics are disabled The input kafka topic has 3 partitions The results: In pipeline1, each task manager processed 10x more messages per second than in pipeline2 (450K vs. 45K). In flink UI I saw that in pipeline2, all 3 task managers were reported as 100% Busy on the GroupByKey stage. I tried running the pipelines with 3 keys, 1000 keys and 100,000 keys. I also tried different triggering options for the window. The performance results were similar. Any idea what could cause the performance difference? Am I missing something? Thanks, Ifat