Hi Kevin, The problem here is about how to evenly distribute partitions from multiple Kafka topics to tasks, while FLIP-370 is only concerned about how to evenly distribute tasks to slots & taskmanagers, so FLIP-370 won't help here.
Best, Zhanghao Chen ________________________________ From: Kevin Lam <kevin....@shopify.com.INVALID> Sent: Thursday, June 6, 2024 2:32 To: dev@flink.apache.org <dev@flink.apache.org> Subject: Poor Load Balancing across TaskManagers for Multiple Kafka Sources Hey all, I'm seeing an issue with poor load balancing across TaskManagers for Kafka Sources using the Flink SQL API and wondering if FLIP-370 will help with it, or if not, interested in any ideas the community has to mitigate the issue. The Kafka SplitEnumerator uses the following logic to assign split owners (code pointer <https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L469> ): ``` static int getSplitOwner(TopicPartition tp, int numReaders) { int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) % numReaders; return (startIndex + tp.partition()) % numReaders; } ``` However this can result in imbalanced distribution of kafka partition consumers across task managers. To illustrate, I created a pipeline that consumes from 2 kafka topics, each with 8 partitions, and just sinks them to a blackhole connector sink. For a parallelism of 16 and 1 task slot per TaskManager, we'd ideally expect each TaskManager to get its own kafka partition. ie. 16 partitions (8 partitions from each topic) split evenly across TaskManagers. However, due the algorithm I linked and how the startIndex is computed, I have observed a bunch of TaskManagers with 2 partitions (one from each topic), and some TaskManager completely idle. I've also run an experiment with the same pipeline where I set parallelism such that each task manager gets exactly 1 partition, and compared it against when each task manager gets exactly 2 partitions (one from each topic). I ensured this was the case by setting an appropriate parallelism, and ran the jobs on an application cluster. Since the partitions are fixed, the extra parallelism if any isn't used. The case where there is exactly 1 partition per TaskManager processes a fixed set of data 20% faster. I was reading FLIP-370 <https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling> and understand it will improve task scheduling in certain scenarios. Will FLIP-370 help with this KafkaSource scenario? If not any ideas to improve the subtask scheduling for KafkaSources? Ideally we don't need to carefully consider the partition + resulting task distribution when selecting our parallelism values. Thanks for your help!