Hi Ahmed, Thanks for the additional information about the current assignment strategy problem that I had overlooked. Initially, I thought that Kafka alone was not sufficient for the flip, but now that you've broadened the scope, I am excited to work on it for the common Flink source interfaces.
I will consider more details and then start a FLIP document. Let's discuss it further later. Best hongshun On Mon, Dec 30, 2024 at 10:16 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote: > Hi Hongshun, > I personally see this as a common challenge for source connectors, it might > be a good FLIP candidate for the common flink-source interfaces. > > I still see a couple of challenges that might be covered in the FLIP, for > example currently the Kafka source doesn't wait for reader registration to > assign splits, yet it adds them to the reader in pending mode. to guarantee > fair distribution this would probably mean we await all reader's > registration to get the current assignment status before assigning any > splits, This might be okay though but needs some PoC and more details (for > example do we only start periodic discovery after reader's registration or > do we hold unassigned topics in state in non pending mode,...etc). > > Regarding the distribution strategy interface, Does this mean you want to > expose the algorithm choice to users or do you intend to make it > configurable by source implementers only (if we are reusing for other > sources besides kafka)? I wouldn't vote to expose it to users but I would > love to reuse it for different connectors if that's the case. > > Finally, given that restoring from state already redistributes splits > evenly, did you consider a simpler approach, like just a round-robin > assignment for all topic/partitions instead of hash based topic assignment > + round-robin partition? > > I am sorry if the discussion is getting more detailed, happy to move it to > a FLIP if the community agrees as well. > > Best Regards > Ahmed Hamdy > > > On Mon, 30 Dec 2024 at 03:12, Hongshun Wang <loserwang1...@gmail.com> > wrote: > > > Hi Ahmed, > > I want to do two things in this flip. > > 1. Add current assignments to ReaderRegistrationEvent. Thus, enumerator > can > > know the overall assignments after restarting the jon. > > 2. Add an distribution strategy interface to the kafka enumerator, thus > can > > flexibly choose how to assign the split based on the current > circumstances. > > > > > > Best, > > Hongshun > > > > On Mon, Dec 30, 2024 at 10:03 AM Hongshun Wang <loserwang1...@gmail.com> > > wrote: > > > > > Hi Ahmed, > > > > > > Thanks for your question. > > > > Do you intend to keep the current assignment in the enumerator > > > state or are you only going to use per session assignment status and > keep > > > it in memory? > > > > > > Not keep in state, because it's be different with the real distribution > > > when redistribute state if the parrelism changed. When restart the > job, > > > each task send an event to tell enumerator its assignments. It's also > > why I > > > want to add a FLIP rather than jira. > > > > > > Best, > > > Hongshun > > > > > > On Mon, Dec 30, 2024 at 12:42 AM Ahmed Hamdy <hamdy10...@gmail.com> > > wrote: > > > > > >> Hi Hongshun > > >> > > >> There is currently a Jira tracking this issue [1]. > > >> > > >> >Therefore, I want the enumerator to select a reader based on the > > current > > >> assignment situation. Each time, the enumerator should choose the > reader > > >> whose assigned split number is the smallest. > > >> > > >> I am not sure I follow how you intend to track the "current split > > >> assignment". Do you intend to keep the current assignment in the > > >> enumerator > > >> state or are you only going to use per session assignment status and > > keep > > >> it in memory? If you intend to do the former, then how are you going > to > > >> handle split redistribution upon parallelism change? > > >> > > >> > > >> 1- https://issues.apache.org/jira/browse/FLINK-31762 > > >> Best Regards > > >> Ahmed Hamdy > > >> > > >> > > >> On Fri, 27 Dec 2024 at 06:38, Hongshun Wang <loserwang1...@gmail.com> > > >> wrote: > > >> > > >> > Hi devs, > > >> > > > >> > I'd like to push a new FLIP xxx: Kafka Enumerator assigns splits > based > > >> on > > >> > the current assignment situation. Currently, when the Kafka > enumerator > > >> > finds a new split, it will choose a reader using the following > > >> algorithm: > > >> > > > >> > > > >> > ```java > > >> > > > >> > static int getSplitOwner(TopicPartition tp, int numReaders) { > > >> > int startIndex = ((tp.topic().hashCode() * 31) & > 0x7FFFFFFF) % > > >> > numReaders; > > >> > > > >> > // here, the assumption is that the id of Kafka partitions > are > > >> > always ascending > > >> > // starting from 0, and therefore can be used directly as > the > > >> > offset clockwise from the > > >> > // start index > > >> > return (startIndex + tp.partition()) % numReaders; > > >> > } > > >> > > > >> > ``` > > >> > > > >> > > > >> > However, I have encountered several unevenly distributed problems in > > >> > production: > > >> > > > >> > > > >> > - problem 1: If I consume multiple topics from a source, the > > >> > getSplitOwner will count the owner of partitions separately for > > each > > >> > topic. This means that there may be overlapping assignments. For > > >> > example, > > >> > if there are 3 Kafka topics with 16 partitions each, even if the > > >> Flink > > >> > task's parallelism is 48, only 16 tasks may be assigned 3 > > partitions > > >> in > > >> > the > > >> > worst-case scenario. > > >> > - problem 2: If I use a Flink source with 16 parallelism to > assumes > > >> one > > >> > topic with 16 partitions. Then, when I change the Flink > parallelism > > >> to > > >> > 32, > > >> > the start index changes when a new partition is discovered > because > > >> the > > >> > number of readers has changed. > > >> > > > >> > > > >> > Each time, I only have one method to solve it: wait for the job to > > >> complete > > >> > a snapshot state, then stop and restart the job with a different > > >> > parallelism. This way, the state redistribution strategy will assign > > the > > >> > splits as evenly as possible. However, this is too cumbersome—why > > can't > > >> it > > >> > be done properly when the enumerator discovers and assigns a new > > split? > > >> > > > >> > > > >> > Therefore, I want the enumerator to select a reader based on the > > current > > >> > assignment situation. Each time, the enumerator should choose the > > reader > > >> > whose assigned split number is the smallest. > > >> > > > >> > > > >> > WDYT? I'm willing to hear from you. > > >> > > > >> > > > >> > Best, > > >> > > > >> > Hongshun > > >> > > > >> > > > > > >