Hi,

I’m trying to understand the performance impact of using GroupByKey in a 
pipeline running with flink runner.

I compared two simple pipelines:
Pipeline1: read from kafka -> DoFn { once in a second write to log the number 
of arrived messages }
Pipeline2: read from kafka -> global window + GroupByKey -> DoFn { once in a 
second write to log the number of arrived messages }

This is the GroupByKey command:

        PCollection<KV<String, Iterable<MyClass>>> shuffledInput = origInput
                .apply("groupByKey", Window.<KV<String, MyClass>>into(new 
GlobalWindows())
                        
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                        .withAllowedLateness(Duration.ZERO) //late messages are 
dropped
                        .discardingFiredPanes())
                .apply(GroupByKey.create());

Configuration:
Parallelism=3, 1 task slot per task manager (results in 3 task managers)
fasterCopy=true
metrics are disabled
The input kafka topic has 3 partitions

The results: In pipeline1, each task manager processed 10x more messages per 
second than in pipeline2 (450K vs. 45K). In flink UI I saw that in pipeline2, 
all 3 task managers were reported as 100% Busy on the GroupByKey stage.

I tried running the pipelines with 3 keys, 1000 keys and 100,000 keys. I also 
tried different triggering options for the window. The performance results were 
similar.

Any idea what could cause the performance difference? Am I missing something?

Thanks,
Ifat

Reply via email to