Hi Hongshun

There is currently a Jira tracking this issue [1].

>Therefore, I want the enumerator to select a reader based on the current
assignment situation. Each time, the enumerator should choose the reader
whose assigned split number is the smallest.

I am not sure I follow how you intend to track the "current split
assignment". Do you intend to keep the current assignment in the enumerator
state or are you only going to use per session assignment status and keep
it in memory? If you intend to do the former, then how are you going to
handle split redistribution upon parallelism change?


1- https://issues.apache.org/jira/browse/FLINK-31762
Best Regards
Ahmed Hamdy


On Fri, 27 Dec 2024 at 06:38, Hongshun Wang <loserwang1...@gmail.com> wrote:

> Hi devs,
>
> I'd like to push a new FLIP xxx: Kafka Enumerator assigns splits based on
> the current assignment situation. Currently, when the Kafka enumerator
> finds a new split, it will choose a reader using the following algorithm:
>
>
> ```java
>
> static int getSplitOwner(TopicPartition tp, int numReaders) {
>         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
> numReaders;
>
>         // here, the assumption is that the id of Kafka partitions are
> always ascending
>         // starting from 0, and therefore can be used directly as the
> offset clockwise from the
>         // start index
>         return (startIndex + tp.partition()) % numReaders;
>     }
>
> ```
>
>
> However, I have encountered several unevenly distributed problems in
> production:
>
>
>    - problem 1: If I consume multiple topics from a source, the
>    getSplitOwner will count the owner of  partitions separately for each
>    topic. This means that there may be overlapping assignments. For
> example,
>    if there are 3 Kafka topics with 16 partitions each, even if the Flink
>    task's parallelism is 48, only 16 tasks may be assigned 3 partitions in
> the
>    worst-case scenario.
>    - problem 2: If I use a Flink source with 16 parallelism to assumes one
>    topic with 16 partitions. Then, when I change the Flink parallelism to
> 32,
>    the start index changes when a new partition is discovered because the
>    number of readers has changed.
>
>
> Each time, I only have one method to solve it: wait for the job to complete
> a snapshot state, then stop and restart the job with a different
> parallelism. This way, the state redistribution strategy will assign the
> splits as evenly as possible. However, this is too cumbersome—why can't it
> be done properly when the enumerator discovers and assigns a new split?
>
>
> Therefore, I want the enumerator to select a reader based on the current
> assignment situation. Each time, the enumerator should choose the reader
> whose assigned split number is the smallest.
>
>
> WDYT? I'm willing to hear from you.
>
>
> Best,
>
> Hongshun
>

Reply via email to