Hi Hongshun, Thank you for drafting the FLIP for this.
Overall, the intent of the FLIP makes sense to me. I'm actually surprised that the partition discovery feature works as it is in the KafkaSource today - in older versions of the Kafka source connector (implemented on source v1), newly discovered partitions are always consumed from EARLIEST regardless of the initial startup offset strategy. A few comments: *1. The proposed change to the OffsetsInitializer#getPartitionOffsets method is a breaking API change. Can we avoid it?*Instead of the additional `firstDiscovery` flag and letting implementations have to check against that flag to differentiate the offset strategy, it seems to be a cleaner API if we simply let users provide two separate OffsetsInitializers - a startup OffsetsInitializer, and a post-startup OffsetsInitializer. By default, the post-startup OffsetsInitializer can be inferred from the startup OffsetsInitializer (e.g. EARLIEST startup is coupled with EARLIEST for post-startup, LATEST startup is coupled with EARLIEST for post-startup, etc.). If the users want to, they can also override and define their own post-startup OffsetsInitializer. This seems to provide more flexibility and a better API design for composability. *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer* Here I'm referring to this comment made earlier between Shammon and Hongshun: > Users can freely set the starting position according to their needs, it may be before the latest Kafka data, or it may be at a certain point in the future. Does future-time TIMESTAMP actually work? In my understanding, if you try to retrieve offsets for an "out-of-bound" timestamp, you'll by default just get the "auto.offset.reset" position. Can you clarify the behaviour here? *3. Clarification on coupling SPECIFIC-OFFSET startup with SPECIFIC-OFFSET post-startup* I'm wondering if this is the correct default combo. What happens if a newly discovered partition was not designated an offset in the provided offsets map? In that case, I believe you'll also just default to the "auto.offset.reset" position, but that means users have an extra config dimension to worry about for partition discovery. Thanks, Gordon On Mon, Mar 27, 2023 at 9:14 PM Shammon FY <zjur...@gmail.com> wrote: > Hi hongshun > > Thanks for updating, the FLIP's pretty good and looks good to me! > > Best, > Shammon FY > > > On Tue, Mar 28, 2023 at 11:16 AM Hongshun Wang <loserwang1...@gmail.com> > wrote: > > > Hi Shammon, > > > > > > Thanks a lot for your advise. I agree with your opinion now. It seems > that > > I forgot to consider that it may be at a certain point in the future. > > > > > > I will modify OffsetsInitializer to provide a different strategy for > later > > discovered partitions, by which users can also customize strategies for > new > > and old partitions. > > > > WDYT? > > > > > > Yours > > > > Hongshun > > > > On Tue, Mar 28, 2023 at 9:00 AM Shammon FY <zjur...@gmail.com> wrote: > > > > > Hi Hongshun > > > > > > Thanks for your answer. > > > > > > I think the startup offset of Kafka such as timestamp or > > > specific_offset has no relationship with `Window Operator`. Users can > > > freely set the starting position according to their needs, it may be > > before > > > the latest Kafka data, or it may be at a certain point in the future. > > > > > > The offsets set by users in Kafka can be divided into four types at the > > > moment: EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSET. The new > discovered > > > partitions may need to be handled with different strategies for these > > four > > > types: > > > > > > 1. EARLIEST, use EARLIEST for the new discovered partitions > > > 2. LATEST, use EARLIEST for the new discovered partitions > > > 3. TIMESTAMP, use TIMESTAMP for the new discovered partitions > > > 4. SPECIFIC_OFFSET, use SPECIFIC_OFFSET for the new discovered > partitions > > > > > > From above, it seems that we only need to do special processing for > > > EARLIEST. What do you think of it? > > > > > > Best, > > > Shammon FY > > > > > > > > > On Fri, Mar 24, 2023 at 11:23 AM Hongshun Wang < > loserwang1...@gmail.com> > > > wrote: > > > > > > > "If all new messages in old partitions should be consumed, all new > > > messages > > > > in new partitions should also be consumed." > > > > > > > > Sorry, I wrote the last sentence incorrectly. > > > > > > > > On Fri, Mar 24, 2023 at 11:15 AM Hongshun Wang < > > loserwang1...@gmail.com> > > > > wrote: > > > > > > > > > Hi Shammon, > > > > > > > > > > Thanks for your advise! I learn a lot about > > TIMESTAMP/SPECIFIC_OFFSET. > > > > > That's interesting. > > > > > > > > > > However, I have a different opinion. > > > > > > > > > > If a user employs the SPECIFIC_OFFSET strategy and enables > > > > auto-discovery, > > > > > they will be able to find new partitions beyond the specified > offset. > > > > > Otherwise, enabling auto-discovery is no sense. > > > > > > > > > > When it comes to the TIMESTAMP strategy, it seems to be trivial. I > > > > > understand your concern, however, it’s the role of time window > rather > > > > than > > > > > partition discovery. The TIMESTAMP strategy means that the consumer > > > > starts > > > > > from the first record whose timestamp is greater than or equal to a > > > given > > > > > timestamp, rather than only consuming all records whose timestamp > is > > > > > greater than or equal to the given timestamp. *Thus, even disable > > auto > > > > > discovery or discover new partitions with TIMESTAMP strategy, same > > > > problems > > > > > still occur.* > > > > > > > > > > Above all , why use EARLIEST strategy? I believe that the strategy > > > > > specified by the startup should be the strategy at the moment of > > > > startup. *So > > > > > there is no difference between new partitions and new messages in > old > > > > > partitions.* Therefore, all the new partition issues that you care > > > about > > > > > will still appear even if you disable the partition, as new > messages > > in > > > > old > > > > > partitions. If all new messages in old partitions should be > consume, > > > all > > > > > new messages in old partitions should also be consume. > > > > > > > > > > > > > > > Best, > > > > > Hongshun > > > > > > > > > > On Thu, Mar 23, 2023 at 8:34 PM Shammon FY <zjur...@gmail.com> > > wrote: > > > > > > > > > >> Hi Hongshun > > > > >> > > > > >> Thanks for driving this discussion. Automatically discovering > > > partitions > > > > >> without losing data sounds great! > > > > >> > > > > >> Currently flink supports kafka source with different startup > modes, > > > such > > > > >> as > > > > >> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET. > > > > >> > > > > >> If I understand correctly, you will set the offset of new > partitions > > > > with > > > > >> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST > > startup > > > > >> mode > > > > >> for new partitions is not suitable if users set > > > > TIMESTAMP/SPECIFIC_OFFSET > > > > >> for kafka in their jobs. > > > > >> > > > > >> For an extreme example, the current time is 2023-03-23 15:00:00 > and > > > > users > > > > >> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a > > > > partition > > > > >> is added during this period, jobs will generate “surprising” data. > > > What > > > > do > > > > >> you think of it? > > > > >> > > > > >> > > > > >> Best, > > > > >> Shammon FY > > > > >> > > > > >> > > > > >> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang < > > > loserwang1...@gmail.com> > > > > >> wrote: > > > > >> > > > > >> > Hi, Hang, > > > > >> > > > > > >> > Thanks for your advice. > > > > >> > > > > > >> > When the second case will occur? Currently, there are three ways > > to > > > > >> specify > > > > >> > partitions in Kafka: by topic, by partition, and by matching the > > > topic > > > > >> with > > > > >> > a regular expression. Currently, if the initial partition number > > is > > > 0, > > > > >> an > > > > >> > error will occur for the first two methods. However, when using > a > > > > >> regular > > > > >> > expression to match topics, it is allowed to have 0 matched > > topics. > > > > >> > > > > > >> > > I don't know when the second case will occur > > > > >> > > > > > >> > > > > > >> > Why prefer the field `firstDiscoveryDone`? When a regular > > expression > > > > >> > initially matches 0 topics, it should consume all messages of > the > > > new > > > > >> > topic. If unassignedInitialPartitons and > > > unassignedTopLevelPartitions > > > > >> are > > > > >> > used instead of firstDiscoveryDone, any new topics created > during > > (5 > > > > >> > minutes discovery + job restart time) will be treated as the > first > > > > >> > discovery, causing data loss. > > > > >> > > > > > >> > > Then when will we get the empty partition list? I think it > > should > > > be > > > > >> > treated as the first initial discovery if both > > > > >> `unassignedInitialPartitons` > > > > >> > and `assignedPartitons` are empty without `firstDiscoveryDone`. > > > > >> > > > > > >> > Best > > > > >> > > > > > >> > Hongshun > > > > >> > > > > > >> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan < > ruanhang1...@gmail.com > > > > > > > >> wrote: > > > > >> > > > > > >> > > Hi, Hongshun, > > > > >> > > > > > > >> > > Thank you for starting this discussion. I have some problems > > > about > > > > >> the > > > > >> > > field `firstDiscoveryDone`. > > > > >> > > > > > > >> > > In the FLIP, why we need firstDiscoveryDone is as follows. > > > > >> > > > Why do we need firstDiscoveryDone? Only relying on the > > > > >> > > unAssignedInitialPartitons attribute cannot distinguish > between > > > the > > > > >> > > following two cases (which often occur in pattern mode): > > > > >> > > > The first partition discovery is so slow, before which the > > > > >> checkpoint > > > > >> > is > > > > >> > > executed and then job is restarted . At this time, the > restored > > > > >> > > unAssignedInitialPartitons is an empty set, which means > > > > non-discovery. > > > > >> > The > > > > >> > > next discovery will be treated as first discovery. > > > > >> > > > The first time the partition is discovered is empty, and new > > > > >> partitions > > > > >> > > can only be found after multiple partition discoveries. If a > > > restart > > > > >> > occurs > > > > >> > > between this period, the restored unAssignedInitialPartitons > is > > > also > > > > >> an > > > > >> > > empty set, which means empty-discovery.The next discovery will > > be > > > > >> treated > > > > >> > > as new discovery. > > > > >> > > > > > > >> > > I don't know when the second case will occur. The partitions > > must > > > be > > > > >> > > greater than 0 when creating topics. And I have read this note > > in > > > > the > > > > >> > FLIP. > > > > >> > > > Note: The current design only applies to cases where all > > > existing > > > > >> > > partitions can be discovered at once. If all old partitions > > cannot > > > > be > > > > >> > > discovered at once, the subsequent old partitions discovered > > will > > > be > > > > >> > > treated as new partitions, leading to message duplication. > > > > Therefore, > > > > >> > this > > > > >> > > point needs to be particularly noted. > > > > >> > > > > > > >> > > Then when will we get the empty partition list? I think it > > should > > > be > > > > >> > > treated as the first initial discovery if both > > > > >> > `unassignedInitialPartitons` > > > > >> > > and `assignedPartitons` are empty without > `firstDiscoveryDone`. > > > > >> > > > > > > >> > > Besides that, I think the `unAssignedInitialPartitons` is > better > > > to > > > > be > > > > >> > > named `unassignedInitialPartitons`. > > > > >> > > > > > > >> > > Best, > > > > >> > > Hang > > > > >> > > > > > > >> > > Hongshun Wang <loserwang1...@gmail.com> 于2023年3月17日周五 > 18:42写道: > > > > >> > > > > > > >> > > > Hi everyone, > > > > >> > > > > > > > >> > > > I would like to start a discussion on FLIP-288:Enable > Dynamic > > > > >> Partition > > > > >> > > > Discovery by Default in Kafka Source[1]. > > > > >> > > > > > > > >> > > > As described in mail thread[2], dynamic partition discovery > is > > > > >> disabled > > > > >> > > by > > > > >> > > > default and users have to explicitly specify the interval of > > > > >> discovery > > > > >> > in > > > > >> > > > order to turn it on. Besides, if the initial offset strategy > > is > > > > >> LATEST, > > > > >> > > > same strategy is used for new partitions, leading to the > loss > > of > > > > >> some > > > > >> > > data > > > > >> > > > (thinking a new partition is created and might be discovered > > by > > > > >> Kafka > > > > >> > > > source several minutes later, and the message produced into > > the > > > > >> > partition > > > > >> > > > within the gap might be dropped if we use for example > "latest" > > > as > > > > >> the > > > > >> > > > initial offset strategy.) > > > > >> > > > > > > > >> > > > The goals of this FLIP are as follows: > > > > >> > > > > > > > >> > > > 1. Enable partition discovery by default. > > > > >> > > > 2. Use earliest as the offset strategy for new partitions > > > after > > > > >> the > > > > >> > > > first discovery. > > > > >> > > > > > > > >> > > > Looking forward to hearing from you. > > > > >> > > > > > > > >> > > > > > > > >> > > > [1] > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source > > > > >> > > > > > > > >> > > > [2] < > > > > >> https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln > > > > >> > > > > > > >> > > > > > > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln > > > > >> > > > > > > > >> > > > > > > > >> > > > Best, > > > > >> > > > > > > > >> > > > Hongshun > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > >