Hi devs,

I'd like to push a new FLIP xxx: Kafka Enumerator assigns splits based on
the current assignment situation. Currently, when the Kafka enumerator
finds a new split, it will choose a reader using the following algorithm:


```java

static int getSplitOwner(TopicPartition tp, int numReaders) {
        int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
numReaders;

        // here, the assumption is that the id of Kafka partitions are
always ascending
        // starting from 0, and therefore can be used directly as the
offset clockwise from the
        // start index
        return (startIndex + tp.partition()) % numReaders;
    }

```


However, I have encountered several unevenly distributed problems in
production:


   - problem 1: If I consume multiple topics from a source, the
   getSplitOwner will count the owner of  partitions separately for each
   topic. This means that there may be overlapping assignments. For example,
   if there are 3 Kafka topics with 16 partitions each, even if the Flink
   task's parallelism is 48, only 16 tasks may be assigned 3 partitions in the
   worst-case scenario.
   - problem 2: If I use a Flink source with 16 parallelism to assumes one
   topic with 16 partitions. Then, when I change the Flink parallelism to 32,
   the start index changes when a new partition is discovered because the
   number of readers has changed.


Each time, I only have one method to solve it: wait for the job to complete
a snapshot state, then stop and restart the job with a different
parallelism. This way, the state redistribution strategy will assign the
splits as evenly as possible. However, this is too cumbersome—why can't it
be done properly when the enumerator discovers and assigns a new split?


Therefore, I want the enumerator to select a reader based on the current
assignment situation. Each time, the enumerator should choose the reader
whose assigned split number is the smallest.


WDYT? I'm willing to hear from you.


Best,

Hongshun

Reply via email to