I've also just found https://issues.apache.org/jira/browse/FLINK-31762
which tracks the Kafka specific issue.

On Wed, Jun 5, 2024 at 3:05 PM Kevin Lam <kevin....@shopify.com> wrote:

> cc. panyuep...@apache.org as related to FLIP-370
>
> On Wed, Jun 5, 2024 at 2:32 PM Kevin Lam <kevin....@shopify.com> wrote:
>
>> Hey all,
>>
>> I'm seeing an issue with poor load balancing across TaskManagers for
>> Kafka Sources using the Flink SQL API and wondering if FLIP-370 will help
>> with it, or if not, interested in any ideas the community has to mitigate
>> the issue.
>>
>> The Kafka SplitEnumerator uses the following logic to assign split owners
>> (code pointer
>> <https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L469>
>> ):
>>
>> ```
>>   static int getSplitOwner(TopicPartition tp, int numReaders) {
>>         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
>> numReaders;
>>         return (startIndex + tp.partition()) % numReaders;
>>     }
>> ```
>>
>> However this can result in imbalanced distribution of kafka partition
>> consumers across task managers.
>>
>> To illustrate, I created a pipeline that consumes from 2 kafka topics,
>> each with 8 partitions, and just sinks them to a blackhole connector sink.
>> For a parallelism of 16 and 1 task slot per TaskManager, we'd ideally
>> expect each TaskManager to get its own kafka partition. ie. 16 partitions
>> (8 partitions from each topic) split evenly across TaskManagers. However,
>> due the algorithm I linked and how the startIndex is computed, I have
>> observed a bunch of TaskManagers with 2 partitions (one from each topic),
>> and some TaskManager completely idle.
>>
>> I've also run an experiment with the same pipeline where I set
>> parallelism such that each task manager gets exactly 1 partition, and
>> compared it against when each task manager gets exactly 2 partitions (one
>> from each topic). I ensured this was the case by setting an appropriate
>> parallelism, and ran the jobs on an application cluster. Since the
>> partitions are fixed, the extra parallelism if any isn't used. The case
>> where there is exactly 1 partition per TaskManager processes a fixed set of
>> data 20% faster.
>>
>> I was reading FLIP-370
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling>
>> and understand it will improve task scheduling in certain scenarios. Will
>> FLIP-370 help with this KafkaSource scenario? If not any ideas to improve
>> the subtask scheduling for KafkaSources? Ideally we don't need to carefully
>> consider the partition + resulting task distribution when selecting our
>> parallelism values.
>>
>> Thanks for your help!
>>
>

Reply via email to