Hi Hongshun,
I personally see this as a common challenge for source connectors, it might
be a good FLIP candidate for the common flink-source interfaces.

I still see a couple of challenges that might be covered in the FLIP, for
example currently the Kafka source doesn't wait for reader registration to
assign splits, yet it adds them to the reader in pending mode. to guarantee
fair distribution this would probably mean we await all reader's
registration to get the current assignment status before assigning any
splits, This might be okay though but needs some PoC and more details (for
example do we only start periodic discovery after reader's registration or
do we hold unassigned topics in state in non pending mode,...etc).

Regarding the distribution strategy interface, Does this mean you want to
expose the algorithm choice to users or do you intend to make it
configurable by source implementers only (if we are reusing for other
sources besides kafka)? I wouldn't vote to expose it to users but I would
love to reuse it for different connectors if that's the case.

Finally, given that restoring from state already redistributes splits
evenly, did you consider a simpler approach, like just a round-robin
assignment for all topic/partitions instead of hash based topic assignment
+ round-robin partition?

I am sorry if the discussion is getting more detailed, happy to move it to
a FLIP if the community agrees as well.

Best Regards
Ahmed Hamdy


On Mon, 30 Dec 2024 at 03:12, Hongshun Wang <loserwang1...@gmail.com> wrote:

> Hi Ahmed,
> I want to do two things in this flip.
> 1. Add current assignments to ReaderRegistrationEvent. Thus, enumerator can
> know the overall assignments after restarting the jon.
> 2. Add an distribution strategy interface to the kafka enumerator, thus can
> flexibly choose how to assign the split based on the current circumstances.
>
>
> Best,
> Hongshun
>
> On Mon, Dec 30, 2024 at 10:03 AM Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
> > Hi Ahmed,
> >
> > Thanks for your question.
> > > Do you intend to keep the current assignment in the enumerator
> > state or are you only going to use per session assignment status and keep
> > it in memory?
> >
> > Not keep in state, because it's be different with the real distribution
> > when redistribute state if the parrelism changed.  When restart the job,
> > each task send an event to tell enumerator its assignments. It's also
> why I
> > want to add a FLIP rather than jira.
> >
> > Best,
> > Hongshun
> >
> > On Mon, Dec 30, 2024 at 12:42 AM Ahmed Hamdy <hamdy10...@gmail.com>
> wrote:
> >
> >> Hi Hongshun
> >>
> >> There is currently a Jira tracking this issue [1].
> >>
> >> >Therefore, I want the enumerator to select a reader based on the
> current
> >> assignment situation. Each time, the enumerator should choose the reader
> >> whose assigned split number is the smallest.
> >>
> >> I am not sure I follow how you intend to track the "current split
> >> assignment". Do you intend to keep the current assignment in the
> >> enumerator
> >> state or are you only going to use per session assignment status and
> keep
> >> it in memory? If you intend to do the former, then how are you going to
> >> handle split redistribution upon parallelism change?
> >>
> >>
> >> 1- https://issues.apache.org/jira/browse/FLINK-31762
> >> Best Regards
> >> Ahmed Hamdy
> >>
> >>
> >> On Fri, 27 Dec 2024 at 06:38, Hongshun Wang <loserwang1...@gmail.com>
> >> wrote:
> >>
> >> > Hi devs,
> >> >
> >> > I'd like to push a new FLIP xxx: Kafka Enumerator assigns splits based
> >> on
> >> > the current assignment situation. Currently, when the Kafka enumerator
> >> > finds a new split, it will choose a reader using the following
> >> algorithm:
> >> >
> >> >
> >> > ```java
> >> >
> >> > static int getSplitOwner(TopicPartition tp, int numReaders) {
> >> >         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
> >> > numReaders;
> >> >
> >> >         // here, the assumption is that the id of Kafka partitions are
> >> > always ascending
> >> >         // starting from 0, and therefore can be used directly as the
> >> > offset clockwise from the
> >> >         // start index
> >> >         return (startIndex + tp.partition()) % numReaders;
> >> >     }
> >> >
> >> > ```
> >> >
> >> >
> >> > However, I have encountered several unevenly distributed problems in
> >> > production:
> >> >
> >> >
> >> >    - problem 1: If I consume multiple topics from a source, the
> >> >    getSplitOwner will count the owner of  partitions separately for
> each
> >> >    topic. This means that there may be overlapping assignments. For
> >> > example,
> >> >    if there are 3 Kafka topics with 16 partitions each, even if the
> >> Flink
> >> >    task's parallelism is 48, only 16 tasks may be assigned 3
> partitions
> >> in
> >> > the
> >> >    worst-case scenario.
> >> >    - problem 2: If I use a Flink source with 16 parallelism to assumes
> >> one
> >> >    topic with 16 partitions. Then, when I change the Flink parallelism
> >> to
> >> > 32,
> >> >    the start index changes when a new partition is discovered because
> >> the
> >> >    number of readers has changed.
> >> >
> >> >
> >> > Each time, I only have one method to solve it: wait for the job to
> >> complete
> >> > a snapshot state, then stop and restart the job with a different
> >> > parallelism. This way, the state redistribution strategy will assign
> the
> >> > splits as evenly as possible. However, this is too cumbersome—why
> can't
> >> it
> >> > be done properly when the enumerator discovers and assigns a new
> split?
> >> >
> >> >
> >> > Therefore, I want the enumerator to select a reader based on the
> current
> >> > assignment situation. Each time, the enumerator should choose the
> reader
> >> > whose assigned split number is the smallest.
> >> >
> >> >
> >> > WDYT? I'm willing to hear from you.
> >> >
> >> >
> >> > Best,
> >> >
> >> > Hongshun
> >> >
> >>
> >
>

Reply via email to