Hi Ahmed,
Thanks for the additional information about the current assignment strategy
problem that I had overlooked. Initially, I thought that Kafka alone was
not sufficient for the flip, but now that you've broadened the scope, I am
excited to work on it for the common Flink source interfaces.

I will consider more details and then start a FLIP document. Let's discuss
it further later.

Best
hongshun

On Mon, Dec 30, 2024 at 10:16 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Hongshun,
> I personally see this as a common challenge for source connectors, it might
> be a good FLIP candidate for the common flink-source interfaces.
>
> I still see a couple of challenges that might be covered in the FLIP, for
> example currently the Kafka source doesn't wait for reader registration to
> assign splits, yet it adds them to the reader in pending mode. to guarantee
> fair distribution this would probably mean we await all reader's
> registration to get the current assignment status before assigning any
> splits, This might be okay though but needs some PoC and more details (for
> example do we only start periodic discovery after reader's registration or
> do we hold unassigned topics in state in non pending mode,...etc).
>
> Regarding the distribution strategy interface, Does this mean you want to
> expose the algorithm choice to users or do you intend to make it
> configurable by source implementers only (if we are reusing for other
> sources besides kafka)? I wouldn't vote to expose it to users but I would
> love to reuse it for different connectors if that's the case.
>
> Finally, given that restoring from state already redistributes splits
> evenly, did you consider a simpler approach, like just a round-robin
> assignment for all topic/partitions instead of hash based topic assignment
> + round-robin partition?
>
> I am sorry if the discussion is getting more detailed, happy to move it to
> a FLIP if the community agrees as well.
>
> Best Regards
> Ahmed Hamdy
>
>
> On Mon, 30 Dec 2024 at 03:12, Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
> > Hi Ahmed,
> > I want to do two things in this flip.
> > 1. Add current assignments to ReaderRegistrationEvent. Thus, enumerator
> can
> > know the overall assignments after restarting the jon.
> > 2. Add an distribution strategy interface to the kafka enumerator, thus
> can
> > flexibly choose how to assign the split based on the current
> circumstances.
> >
> >
> > Best,
> > Hongshun
> >
> > On Mon, Dec 30, 2024 at 10:03 AM Hongshun Wang <loserwang1...@gmail.com>
> > wrote:
> >
> > > Hi Ahmed,
> > >
> > > Thanks for your question.
> > > > Do you intend to keep the current assignment in the enumerator
> > > state or are you only going to use per session assignment status and
> keep
> > > it in memory?
> > >
> > > Not keep in state, because it's be different with the real distribution
> > > when redistribute state if the parrelism changed.  When restart the
> job,
> > > each task send an event to tell enumerator its assignments. It's also
> > why I
> > > want to add a FLIP rather than jira.
> > >
> > > Best,
> > > Hongshun
> > >
> > > On Mon, Dec 30, 2024 at 12:42 AM Ahmed Hamdy <hamdy10...@gmail.com>
> > wrote:
> > >
> > >> Hi Hongshun
> > >>
> > >> There is currently a Jira tracking this issue [1].
> > >>
> > >> >Therefore, I want the enumerator to select a reader based on the
> > current
> > >> assignment situation. Each time, the enumerator should choose the
> reader
> > >> whose assigned split number is the smallest.
> > >>
> > >> I am not sure I follow how you intend to track the "current split
> > >> assignment". Do you intend to keep the current assignment in the
> > >> enumerator
> > >> state or are you only going to use per session assignment status and
> > keep
> > >> it in memory? If you intend to do the former, then how are you going
> to
> > >> handle split redistribution upon parallelism change?
> > >>
> > >>
> > >> 1- https://issues.apache.org/jira/browse/FLINK-31762
> > >> Best Regards
> > >> Ahmed Hamdy
> > >>
> > >>
> > >> On Fri, 27 Dec 2024 at 06:38, Hongshun Wang <loserwang1...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi devs,
> > >> >
> > >> > I'd like to push a new FLIP xxx: Kafka Enumerator assigns splits
> based
> > >> on
> > >> > the current assignment situation. Currently, when the Kafka
> enumerator
> > >> > finds a new split, it will choose a reader using the following
> > >> algorithm:
> > >> >
> > >> >
> > >> > ```java
> > >> >
> > >> > static int getSplitOwner(TopicPartition tp, int numReaders) {
> > >> >         int startIndex = ((tp.topic().hashCode() * 31) &
> 0x7FFFFFFF) %
> > >> > numReaders;
> > >> >
> > >> >         // here, the assumption is that the id of Kafka partitions
> are
> > >> > always ascending
> > >> >         // starting from 0, and therefore can be used directly as
> the
> > >> > offset clockwise from the
> > >> >         // start index
> > >> >         return (startIndex + tp.partition()) % numReaders;
> > >> >     }
> > >> >
> > >> > ```
> > >> >
> > >> >
> > >> > However, I have encountered several unevenly distributed problems in
> > >> > production:
> > >> >
> > >> >
> > >> >    - problem 1: If I consume multiple topics from a source, the
> > >> >    getSplitOwner will count the owner of  partitions separately for
> > each
> > >> >    topic. This means that there may be overlapping assignments. For
> > >> > example,
> > >> >    if there are 3 Kafka topics with 16 partitions each, even if the
> > >> Flink
> > >> >    task's parallelism is 48, only 16 tasks may be assigned 3
> > partitions
> > >> in
> > >> > the
> > >> >    worst-case scenario.
> > >> >    - problem 2: If I use a Flink source with 16 parallelism to
> assumes
> > >> one
> > >> >    topic with 16 partitions. Then, when I change the Flink
> parallelism
> > >> to
> > >> > 32,
> > >> >    the start index changes when a new partition is discovered
> because
> > >> the
> > >> >    number of readers has changed.
> > >> >
> > >> >
> > >> > Each time, I only have one method to solve it: wait for the job to
> > >> complete
> > >> > a snapshot state, then stop and restart the job with a different
> > >> > parallelism. This way, the state redistribution strategy will assign
> > the
> > >> > splits as evenly as possible. However, this is too cumbersome—why
> > can't
> > >> it
> > >> > be done properly when the enumerator discovers and assigns a new
> > split?
> > >> >
> > >> >
> > >> > Therefore, I want the enumerator to select a reader based on the
> > current
> > >> > assignment situation. Each time, the enumerator should choose the
> > reader
> > >> > whose assigned split number is the smallest.
> > >> >
> > >> >
> > >> > WDYT? I'm willing to hear from you.
> > >> >
> > >> >
> > >> > Best,
> > >> >
> > >> > Hongshun
> > >> >
> > >>
> > >
> >
>

Reply via email to