cc. panyuep...@apache.org as related to FLIP-370 On Wed, Jun 5, 2024 at 2:32 PM Kevin Lam <kevin....@shopify.com> wrote:
> Hey all, > > I'm seeing an issue with poor load balancing across TaskManagers for Kafka > Sources using the Flink SQL API and wondering if FLIP-370 will help with > it, or if not, interested in any ideas the community has to mitigate the > issue. > > The Kafka SplitEnumerator uses the following logic to assign split owners > (code > pointer > <https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L469> > ): > > ``` > static int getSplitOwner(TopicPartition tp, int numReaders) { > int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) % > numReaders; > return (startIndex + tp.partition()) % numReaders; > } > ``` > > However this can result in imbalanced distribution of kafka partition > consumers across task managers. > > To illustrate, I created a pipeline that consumes from 2 kafka topics, > each with 8 partitions, and just sinks them to a blackhole connector sink. > For a parallelism of 16 and 1 task slot per TaskManager, we'd ideally > expect each TaskManager to get its own kafka partition. ie. 16 partitions > (8 partitions from each topic) split evenly across TaskManagers. However, > due the algorithm I linked and how the startIndex is computed, I have > observed a bunch of TaskManagers with 2 partitions (one from each topic), > and some TaskManager completely idle. > > I've also run an experiment with the same pipeline where I set parallelism > such that each task manager gets exactly 1 partition, and compared it > against when each task manager gets exactly 2 partitions (one from each > topic). I ensured this was the case by setting an appropriate parallelism, > and ran the jobs on an application cluster. Since the partitions are fixed, > the extra parallelism if any isn't used. The case where there is exactly 1 > partition per TaskManager processes a fixed set of data 20% faster. > > I was reading FLIP-370 > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling> > and understand it will improve task scheduling in certain scenarios. Will > FLIP-370 help with this KafkaSource scenario? If not any ideas to improve > the subtask scheduling for KafkaSources? Ideally we don't need to carefully > consider the partition + resulting task distribution when selecting our > parallelism values. > > Thanks for your help! >