Hi Ahmed, I want to do two things in this flip. 1. Add current assignments to ReaderRegistrationEvent. Thus, enumerator can know the overall assignments after restarting the jon. 2. Add an distribution strategy interface to the kafka enumerator, thus can flexibly choose how to assign the split based on the current circumstances.
Best, Hongshun On Mon, Dec 30, 2024 at 10:03 AM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi Ahmed, > > Thanks for your question. > > Do you intend to keep the current assignment in the enumerator > state or are you only going to use per session assignment status and keep > it in memory? > > Not keep in state, because it's be different with the real distribution > when redistribute state if the parrelism changed. When restart the job, > each task send an event to tell enumerator its assignments. It's also why I > want to add a FLIP rather than jira. > > Best, > Hongshun > > On Mon, Dec 30, 2024 at 12:42 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote: > >> Hi Hongshun >> >> There is currently a Jira tracking this issue [1]. >> >> >Therefore, I want the enumerator to select a reader based on the current >> assignment situation. Each time, the enumerator should choose the reader >> whose assigned split number is the smallest. >> >> I am not sure I follow how you intend to track the "current split >> assignment". Do you intend to keep the current assignment in the >> enumerator >> state or are you only going to use per session assignment status and keep >> it in memory? If you intend to do the former, then how are you going to >> handle split redistribution upon parallelism change? >> >> >> 1- https://issues.apache.org/jira/browse/FLINK-31762 >> Best Regards >> Ahmed Hamdy >> >> >> On Fri, 27 Dec 2024 at 06:38, Hongshun Wang <loserwang1...@gmail.com> >> wrote: >> >> > Hi devs, >> > >> > I'd like to push a new FLIP xxx: Kafka Enumerator assigns splits based >> on >> > the current assignment situation. Currently, when the Kafka enumerator >> > finds a new split, it will choose a reader using the following >> algorithm: >> > >> > >> > ```java >> > >> > static int getSplitOwner(TopicPartition tp, int numReaders) { >> > int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) % >> > numReaders; >> > >> > // here, the assumption is that the id of Kafka partitions are >> > always ascending >> > // starting from 0, and therefore can be used directly as the >> > offset clockwise from the >> > // start index >> > return (startIndex + tp.partition()) % numReaders; >> > } >> > >> > ``` >> > >> > >> > However, I have encountered several unevenly distributed problems in >> > production: >> > >> > >> > - problem 1: If I consume multiple topics from a source, the >> > getSplitOwner will count the owner of partitions separately for each >> > topic. This means that there may be overlapping assignments. For >> > example, >> > if there are 3 Kafka topics with 16 partitions each, even if the >> Flink >> > task's parallelism is 48, only 16 tasks may be assigned 3 partitions >> in >> > the >> > worst-case scenario. >> > - problem 2: If I use a Flink source with 16 parallelism to assumes >> one >> > topic with 16 partitions. Then, when I change the Flink parallelism >> to >> > 32, >> > the start index changes when a new partition is discovered because >> the >> > number of readers has changed. >> > >> > >> > Each time, I only have one method to solve it: wait for the job to >> complete >> > a snapshot state, then stop and restart the job with a different >> > parallelism. This way, the state redistribution strategy will assign the >> > splits as evenly as possible. However, this is too cumbersome—why can't >> it >> > be done properly when the enumerator discovers and assigns a new split? >> > >> > >> > Therefore, I want the enumerator to select a reader based on the current >> > assignment situation. Each time, the enumerator should choose the reader >> > whose assigned split number is the smallest. >> > >> > >> > WDYT? I'm willing to hear from you. >> > >> > >> > Best, >> > >> > Hongshun >> > >> >