Hey all,

I'm seeing an issue with poor load balancing across TaskManagers for Kafka
Sources using the Flink SQL API and wondering if FLIP-370 will help with
it, or if not, interested in any ideas the community has to mitigate the
issue.

The Kafka SplitEnumerator uses the following logic to assign split owners (code
pointer
<https://github.com/apache/flink-connector-kafka/blob/00c9c8c74121136a0c1710ac77f307dc53adae99/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L469>
):

```
  static int getSplitOwner(TopicPartition tp, int numReaders) {
        int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
numReaders;
        return (startIndex + tp.partition()) % numReaders;
    }
```

However this can result in imbalanced distribution of kafka partition
consumers across task managers.

To illustrate, I created a pipeline that consumes from 2 kafka topics, each
with 8 partitions, and just sinks them to a blackhole connector sink. For a
parallelism of 16 and 1 task slot per TaskManager, we'd ideally expect each
TaskManager to get its own kafka partition. ie. 16 partitions (8 partitions
from each topic) split evenly across TaskManagers. However, due the
algorithm I linked and how the startIndex is computed, I have observed a
bunch of TaskManagers with 2 partitions (one from each topic), and some
TaskManager completely idle.

I've also run an experiment with the same pipeline where I set parallelism
such that each task manager gets exactly 1 partition, and compared it
against when each task manager gets exactly 2 partitions (one from each
topic). I ensured this was the case by setting an appropriate parallelism,
and ran the jobs on an application cluster. Since the partitions are fixed,
the extra parallelism if any isn't used. The case where there is exactly 1
partition per TaskManager processes a fixed set of data 20% faster.

I was reading FLIP-370
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling>
and understand it will improve task scheduling in certain scenarios. Will
FLIP-370 help with this KafkaSource scenario? If not any ideas to improve
the subtask scheduling for KafkaSources? Ideally we don't need to carefully
consider the partition + resulting task distribution when selecting our
parallelism values.

Thanks for your help!

Reply via email to