Hi Ahmed,

Thanks for your question.
> Do you intend to keep the current assignment in the enumerator
state or are you only going to use per session assignment status and keep
it in memory?

Not keep in state, because it's be different with the real distribution
when redistribute state if the parrelism changed.  When restart the job,
each task send an event to tell enumerator its assignments. It's also why I
want to add a FLIP rather than jira.

Best,
Hongshun

On Mon, Dec 30, 2024 at 12:42 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Hongshun
>
> There is currently a Jira tracking this issue [1].
>
> >Therefore, I want the enumerator to select a reader based on the current
> assignment situation. Each time, the enumerator should choose the reader
> whose assigned split number is the smallest.
>
> I am not sure I follow how you intend to track the "current split
> assignment". Do you intend to keep the current assignment in the enumerator
> state or are you only going to use per session assignment status and keep
> it in memory? If you intend to do the former, then how are you going to
> handle split redistribution upon parallelism change?
>
>
> 1- https://issues.apache.org/jira/browse/FLINK-31762
> Best Regards
> Ahmed Hamdy
>
>
> On Fri, 27 Dec 2024 at 06:38, Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
> > Hi devs,
> >
> > I'd like to push a new FLIP xxx: Kafka Enumerator assigns splits based on
> > the current assignment situation. Currently, when the Kafka enumerator
> > finds a new split, it will choose a reader using the following algorithm:
> >
> >
> > ```java
> >
> > static int getSplitOwner(TopicPartition tp, int numReaders) {
> >         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
> > numReaders;
> >
> >         // here, the assumption is that the id of Kafka partitions are
> > always ascending
> >         // starting from 0, and therefore can be used directly as the
> > offset clockwise from the
> >         // start index
> >         return (startIndex + tp.partition()) % numReaders;
> >     }
> >
> > ```
> >
> >
> > However, I have encountered several unevenly distributed problems in
> > production:
> >
> >
> >    - problem 1: If I consume multiple topics from a source, the
> >    getSplitOwner will count the owner of  partitions separately for each
> >    topic. This means that there may be overlapping assignments. For
> > example,
> >    if there are 3 Kafka topics with 16 partitions each, even if the Flink
> >    task's parallelism is 48, only 16 tasks may be assigned 3 partitions
> in
> > the
> >    worst-case scenario.
> >    - problem 2: If I use a Flink source with 16 parallelism to assumes
> one
> >    topic with 16 partitions. Then, when I change the Flink parallelism to
> > 32,
> >    the start index changes when a new partition is discovered because the
> >    number of readers has changed.
> >
> >
> > Each time, I only have one method to solve it: wait for the job to
> complete
> > a snapshot state, then stop and restart the job with a different
> > parallelism. This way, the state redistribution strategy will assign the
> > splits as evenly as possible. However, this is too cumbersome—why can't
> it
> > be done properly when the enumerator discovers and assigns a new split?
> >
> >
> > Therefore, I want the enumerator to select a reader based on the current
> > assignment situation. Each time, the enumerator should choose the reader
> > whose assigned split number is the smallest.
> >
> >
> > WDYT? I'm willing to hear from you.
> >
> >
> > Best,
> >
> > Hongshun
> >
>

Reply via email to