Hi hongshun Thanks for updating, the FLIP's pretty good and looks good to me!
Best, Shammon FY On Tue, Mar 28, 2023 at 11:16 AM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi Shammon, > > > Thanks a lot for your advise. I agree with your opinion now. It seems that > I forgot to consider that it may be at a certain point in the future. > > > I will modify OffsetsInitializer to provide a different strategy for later > discovered partitions, by which users can also customize strategies for new > and old partitions. > > WDYT? > > > Yours > > Hongshun > > On Tue, Mar 28, 2023 at 9:00 AM Shammon FY <zjur...@gmail.com> wrote: > > > Hi Hongshun > > > > Thanks for your answer. > > > > I think the startup offset of Kafka such as timestamp or > > specific_offset has no relationship with `Window Operator`. Users can > > freely set the starting position according to their needs, it may be > before > > the latest Kafka data, or it may be at a certain point in the future. > > > > The offsets set by users in Kafka can be divided into four types at the > > moment: EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSET. The new discovered > > partitions may need to be handled with different strategies for these > four > > types: > > > > 1. EARLIEST, use EARLIEST for the new discovered partitions > > 2. LATEST, use EARLIEST for the new discovered partitions > > 3. TIMESTAMP, use TIMESTAMP for the new discovered partitions > > 4. SPECIFIC_OFFSET, use SPECIFIC_OFFSET for the new discovered partitions > > > > From above, it seems that we only need to do special processing for > > EARLIEST. What do you think of it? > > > > Best, > > Shammon FY > > > > > > On Fri, Mar 24, 2023 at 11:23 AM Hongshun Wang <loserwang1...@gmail.com> > > wrote: > > > > > "If all new messages in old partitions should be consumed, all new > > messages > > > in new partitions should also be consumed." > > > > > > Sorry, I wrote the last sentence incorrectly. > > > > > > On Fri, Mar 24, 2023 at 11:15 AM Hongshun Wang < > loserwang1...@gmail.com> > > > wrote: > > > > > > > Hi Shammon, > > > > > > > > Thanks for your advise! I learn a lot about > TIMESTAMP/SPECIFIC_OFFSET. > > > > That's interesting. > > > > > > > > However, I have a different opinion. > > > > > > > > If a user employs the SPECIFIC_OFFSET strategy and enables > > > auto-discovery, > > > > they will be able to find new partitions beyond the specified offset. > > > > Otherwise, enabling auto-discovery is no sense. > > > > > > > > When it comes to the TIMESTAMP strategy, it seems to be trivial. I > > > > understand your concern, however, it’s the role of time window rather > > > than > > > > partition discovery. The TIMESTAMP strategy means that the consumer > > > starts > > > > from the first record whose timestamp is greater than or equal to a > > given > > > > timestamp, rather than only consuming all records whose timestamp is > > > > greater than or equal to the given timestamp. *Thus, even disable > auto > > > > discovery or discover new partitions with TIMESTAMP strategy, same > > > problems > > > > still occur.* > > > > > > > > Above all , why use EARLIEST strategy? I believe that the strategy > > > > specified by the startup should be the strategy at the moment of > > > startup. *So > > > > there is no difference between new partitions and new messages in old > > > > partitions.* Therefore, all the new partition issues that you care > > about > > > > will still appear even if you disable the partition, as new messages > in > > > old > > > > partitions. If all new messages in old partitions should be consume, > > all > > > > new messages in old partitions should also be consume. > > > > > > > > > > > > Best, > > > > Hongshun > > > > > > > > On Thu, Mar 23, 2023 at 8:34 PM Shammon FY <zjur...@gmail.com> > wrote: > > > > > > > >> Hi Hongshun > > > >> > > > >> Thanks for driving this discussion. Automatically discovering > > partitions > > > >> without losing data sounds great! > > > >> > > > >> Currently flink supports kafka source with different startup modes, > > such > > > >> as > > > >> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET. > > > >> > > > >> If I understand correctly, you will set the offset of new partitions > > > with > > > >> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST > startup > > > >> mode > > > >> for new partitions is not suitable if users set > > > TIMESTAMP/SPECIFIC_OFFSET > > > >> for kafka in their jobs. > > > >> > > > >> For an extreme example, the current time is 2023-03-23 15:00:00 and > > > users > > > >> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a > > > partition > > > >> is added during this period, jobs will generate “surprising” data. > > What > > > do > > > >> you think of it? > > > >> > > > >> > > > >> Best, > > > >> Shammon FY > > > >> > > > >> > > > >> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang < > > loserwang1...@gmail.com> > > > >> wrote: > > > >> > > > >> > Hi, Hang, > > > >> > > > > >> > Thanks for your advice. > > > >> > > > > >> > When the second case will occur? Currently, there are three ways > to > > > >> specify > > > >> > partitions in Kafka: by topic, by partition, and by matching the > > topic > > > >> with > > > >> > a regular expression. Currently, if the initial partition number > is > > 0, > > > >> an > > > >> > error will occur for the first two methods. However, when using a > > > >> regular > > > >> > expression to match topics, it is allowed to have 0 matched > topics. > > > >> > > > > >> > > I don't know when the second case will occur > > > >> > > > > >> > > > > >> > Why prefer the field `firstDiscoveryDone`? When a regular > expression > > > >> > initially matches 0 topics, it should consume all messages of the > > new > > > >> > topic. If unassignedInitialPartitons and > > unassignedTopLevelPartitions > > > >> are > > > >> > used instead of firstDiscoveryDone, any new topics created during > (5 > > > >> > minutes discovery + job restart time) will be treated as the first > > > >> > discovery, causing data loss. > > > >> > > > > >> > > Then when will we get the empty partition list? I think it > should > > be > > > >> > treated as the first initial discovery if both > > > >> `unassignedInitialPartitons` > > > >> > and `assignedPartitons` are empty without `firstDiscoveryDone`. > > > >> > > > > >> > Best > > > >> > > > > >> > Hongshun > > > >> > > > > >> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan <ruanhang1...@gmail.com > > > > > >> wrote: > > > >> > > > > >> > > Hi, Hongshun, > > > >> > > > > > >> > > Thank you for starting this discussion. I have some problems > > about > > > >> the > > > >> > > field `firstDiscoveryDone`. > > > >> > > > > > >> > > In the FLIP, why we need firstDiscoveryDone is as follows. > > > >> > > > Why do we need firstDiscoveryDone? Only relying on the > > > >> > > unAssignedInitialPartitons attribute cannot distinguish between > > the > > > >> > > following two cases (which often occur in pattern mode): > > > >> > > > The first partition discovery is so slow, before which the > > > >> checkpoint > > > >> > is > > > >> > > executed and then job is restarted . At this time, the restored > > > >> > > unAssignedInitialPartitons is an empty set, which means > > > non-discovery. > > > >> > The > > > >> > > next discovery will be treated as first discovery. > > > >> > > > The first time the partition is discovered is empty, and new > > > >> partitions > > > >> > > can only be found after multiple partition discoveries. If a > > restart > > > >> > occurs > > > >> > > between this period, the restored unAssignedInitialPartitons is > > also > > > >> an > > > >> > > empty set, which means empty-discovery.The next discovery will > be > > > >> treated > > > >> > > as new discovery. > > > >> > > > > > >> > > I don't know when the second case will occur. The partitions > must > > be > > > >> > > greater than 0 when creating topics. And I have read this note > in > > > the > > > >> > FLIP. > > > >> > > > Note: The current design only applies to cases where all > > existing > > > >> > > partitions can be discovered at once. If all old partitions > cannot > > > be > > > >> > > discovered at once, the subsequent old partitions discovered > will > > be > > > >> > > treated as new partitions, leading to message duplication. > > > Therefore, > > > >> > this > > > >> > > point needs to be particularly noted. > > > >> > > > > > >> > > Then when will we get the empty partition list? I think it > should > > be > > > >> > > treated as the first initial discovery if both > > > >> > `unassignedInitialPartitons` > > > >> > > and `assignedPartitons` are empty without `firstDiscoveryDone`. > > > >> > > > > > >> > > Besides that, I think the `unAssignedInitialPartitons` is better > > to > > > be > > > >> > > named `unassignedInitialPartitons`. > > > >> > > > > > >> > > Best, > > > >> > > Hang > > > >> > > > > > >> > > Hongshun Wang <loserwang1...@gmail.com> 于2023年3月17日周五 18:42写道: > > > >> > > > > > >> > > > Hi everyone, > > > >> > > > > > > >> > > > I would like to start a discussion on FLIP-288:Enable Dynamic > > > >> Partition > > > >> > > > Discovery by Default in Kafka Source[1]. > > > >> > > > > > > >> > > > As described in mail thread[2], dynamic partition discovery is > > > >> disabled > > > >> > > by > > > >> > > > default and users have to explicitly specify the interval of > > > >> discovery > > > >> > in > > > >> > > > order to turn it on. Besides, if the initial offset strategy > is > > > >> LATEST, > > > >> > > > same strategy is used for new partitions, leading to the loss > of > > > >> some > > > >> > > data > > > >> > > > (thinking a new partition is created and might be discovered > by > > > >> Kafka > > > >> > > > source several minutes later, and the message produced into > the > > > >> > partition > > > >> > > > within the gap might be dropped if we use for example "latest" > > as > > > >> the > > > >> > > > initial offset strategy.) > > > >> > > > > > > >> > > > The goals of this FLIP are as follows: > > > >> > > > > > > >> > > > 1. Enable partition discovery by default. > > > >> > > > 2. Use earliest as the offset strategy for new partitions > > after > > > >> the > > > >> > > > first discovery. > > > >> > > > > > > >> > > > Looking forward to hearing from you. > > > >> > > > > > > >> > > > > > > >> > > > [1] > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source > > > >> > > > > > > >> > > > [2] < > > > >> https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln > > > >> > > > > > >> > > > > > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln > > > >> > > > > > > >> > > > > > > >> > > > Best, > > > >> > > > > > > >> > > > Hongshun > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > >