Thanks for the response Zhanghao. Since FLIP-370 won't help, any ideas on
how this can be improved? Can we round-robin assign partitions from all
KafkaTopics to TaskManagers as suggested in
https://issues.apache.org/jira/browse/FLINK-31762?

On Wed, Jun 5, 2024 at 10:52 PM Zhanghao Chen <zhanghao.c...@outlook.com>
wrote:

> Hi Kevin,
>
> The problem here is about how to evenly distribute partitions from
> multiple Kafka topics to tasks, while FLIP-370 is only concerned about how
> to evenly distribute tasks to slots & taskmanagers, so FLIP-370 won't help
> here.
>
> Best,
> Zhanghao Chen
> ________________________________
> From: Kevin Lam <kevin....@shopify.com.INVALID>
> Sent: Thursday, June 6, 2024 2:32
> To: dev@flink.apache.org <dev@flink.apache.org>
> Subject: Poor Load Balancing across TaskManagers for Multiple Kafka Sources
>
> Hey all,
>
> I'm seeing an issue with poor load balancing across TaskManagers for Kafka
> Sources using the Flink SQL API and wondering if FLIP-370 will help with
> it, or if not, interested in any ideas the community has to mitigate the
> issue.
>
> The Kafka SplitEnumerator uses the following logic to assign split owners
> (code
> pointer
> <
> https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L469
> >
> ):
>
> ```
>   static int getSplitOwner(TopicPartition tp, int numReaders) {
>         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
> numReaders;
>         return (startIndex + tp.partition()) % numReaders;
>     }
> ```
>
> However this can result in imbalanced distribution of kafka partition
> consumers across task managers.
>
> To illustrate, I created a pipeline that consumes from 2 kafka topics, each
> with 8 partitions, and just sinks them to a blackhole connector sink. For a
> parallelism of 16 and 1 task slot per TaskManager, we'd ideally expect each
> TaskManager to get its own kafka partition. ie. 16 partitions (8 partitions
> from each topic) split evenly across TaskManagers. However, due the
> algorithm I linked and how the startIndex is computed, I have observed a
> bunch of TaskManagers with 2 partitions (one from each topic), and some
> TaskManager completely idle.
>
> I've also run an experiment with the same pipeline where I set parallelism
> such that each task manager gets exactly 1 partition, and compared it
> against when each task manager gets exactly 2 partitions (one from each
> topic). I ensured this was the case by setting an appropriate parallelism,
> and ran the jobs on an application cluster. Since the partitions are fixed,
> the extra parallelism if any isn't used. The case where there is exactly 1
> partition per TaskManager processes a fixed set of data 20% faster.
>
> I was reading FLIP-370
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling
> >
> and understand it will improve task scheduling in certain scenarios. Will
> FLIP-370 help with this KafkaSource scenario? If not any ideas to improve
> the subtask scheduling for KafkaSources? Ideally we don't need to carefully
> consider the partition + resulting task distribution when selecting our
> parallelism values.
>
> Thanks for your help!
>

Reply via email to