"If all new messages in old partitions should be consumed, all new messages in new partitions should also be consumed."
Sorry, I wrote the last sentence incorrectly. On Fri, Mar 24, 2023 at 11:15 AM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi Shammon, > > Thanks for your advise! I learn a lot about TIMESTAMP/SPECIFIC_OFFSET. > That's interesting. > > However, I have a different opinion. > > If a user employs the SPECIFIC_OFFSET strategy and enables auto-discovery, > they will be able to find new partitions beyond the specified offset. > Otherwise, enabling auto-discovery is no sense. > > When it comes to the TIMESTAMP strategy, it seems to be trivial. I > understand your concern, however, it’s the role of time window rather than > partition discovery. The TIMESTAMP strategy means that the consumer starts > from the first record whose timestamp is greater than or equal to a given > timestamp, rather than only consuming all records whose timestamp is > greater than or equal to the given timestamp. *Thus, even disable auto > discovery or discover new partitions with TIMESTAMP strategy, same problems > still occur.* > > Above all , why use EARLIEST strategy? I believe that the strategy > specified by the startup should be the strategy at the moment of startup. *So > there is no difference between new partitions and new messages in old > partitions.* Therefore, all the new partition issues that you care about > will still appear even if you disable the partition, as new messages in old > partitions. If all new messages in old partitions should be consume, all > new messages in old partitions should also be consume. > > > Best, > Hongshun > > On Thu, Mar 23, 2023 at 8:34 PM Shammon FY <zjur...@gmail.com> wrote: > >> Hi Hongshun >> >> Thanks for driving this discussion. Automatically discovering partitions >> without losing data sounds great! >> >> Currently flink supports kafka source with different startup modes, such >> as >> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET. >> >> If I understand correctly, you will set the offset of new partitions with >> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup >> mode >> for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET >> for kafka in their jobs. >> >> For an extreme example, the current time is 2023-03-23 15:00:00 and users >> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition >> is added during this period, jobs will generate “surprising” data. What do >> you think of it? >> >> >> Best, >> Shammon FY >> >> >> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang <loserwang1...@gmail.com> >> wrote: >> >> > Hi, Hang, >> > >> > Thanks for your advice. >> > >> > When the second case will occur? Currently, there are three ways to >> specify >> > partitions in Kafka: by topic, by partition, and by matching the topic >> with >> > a regular expression. Currently, if the initial partition number is 0, >> an >> > error will occur for the first two methods. However, when using a >> regular >> > expression to match topics, it is allowed to have 0 matched topics. >> > >> > > I don't know when the second case will occur >> > >> > >> > Why prefer the field `firstDiscoveryDone`? When a regular expression >> > initially matches 0 topics, it should consume all messages of the new >> > topic. If unassignedInitialPartitons and unassignedTopLevelPartitions >> are >> > used instead of firstDiscoveryDone, any new topics created during (5 >> > minutes discovery + job restart time) will be treated as the first >> > discovery, causing data loss. >> > >> > > Then when will we get the empty partition list? I think it should be >> > treated as the first initial discovery if both >> `unassignedInitialPartitons` >> > and `assignedPartitons` are empty without `firstDiscoveryDone`. >> > >> > Best >> > >> > Hongshun >> > >> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan <ruanhang1...@gmail.com> >> wrote: >> > >> > > Hi, Hongshun, >> > > >> > > Thank you for starting this discussion. I have some problems about >> the >> > > field `firstDiscoveryDone`. >> > > >> > > In the FLIP, why we need firstDiscoveryDone is as follows. >> > > > Why do we need firstDiscoveryDone? Only relying on the >> > > unAssignedInitialPartitons attribute cannot distinguish between the >> > > following two cases (which often occur in pattern mode): >> > > > The first partition discovery is so slow, before which the >> checkpoint >> > is >> > > executed and then job is restarted . At this time, the restored >> > > unAssignedInitialPartitons is an empty set, which means non-discovery. >> > The >> > > next discovery will be treated as first discovery. >> > > > The first time the partition is discovered is empty, and new >> partitions >> > > can only be found after multiple partition discoveries. If a restart >> > occurs >> > > between this period, the restored unAssignedInitialPartitons is also >> an >> > > empty set, which means empty-discovery.The next discovery will be >> treated >> > > as new discovery. >> > > >> > > I don't know when the second case will occur. The partitions must be >> > > greater than 0 when creating topics. And I have read this note in the >> > FLIP. >> > > > Note: The current design only applies to cases where all existing >> > > partitions can be discovered at once. If all old partitions cannot be >> > > discovered at once, the subsequent old partitions discovered will be >> > > treated as new partitions, leading to message duplication. Therefore, >> > this >> > > point needs to be particularly noted. >> > > >> > > Then when will we get the empty partition list? I think it should be >> > > treated as the first initial discovery if both >> > `unassignedInitialPartitons` >> > > and `assignedPartitons` are empty without `firstDiscoveryDone`. >> > > >> > > Besides that, I think the `unAssignedInitialPartitons` is better to be >> > > named `unassignedInitialPartitons`. >> > > >> > > Best, >> > > Hang >> > > >> > > Hongshun Wang <loserwang1...@gmail.com> 于2023年3月17日周五 18:42写道: >> > > >> > > > Hi everyone, >> > > > >> > > > I would like to start a discussion on FLIP-288:Enable Dynamic >> Partition >> > > > Discovery by Default in Kafka Source[1]. >> > > > >> > > > As described in mail thread[2], dynamic partition discovery is >> disabled >> > > by >> > > > default and users have to explicitly specify the interval of >> discovery >> > in >> > > > order to turn it on. Besides, if the initial offset strategy is >> LATEST, >> > > > same strategy is used for new partitions, leading to the loss of >> some >> > > data >> > > > (thinking a new partition is created and might be discovered by >> Kafka >> > > > source several minutes later, and the message produced into the >> > partition >> > > > within the gap might be dropped if we use for example "latest" as >> the >> > > > initial offset strategy.) >> > > > >> > > > The goals of this FLIP are as follows: >> > > > >> > > > 1. Enable partition discovery by default. >> > > > 2. Use earliest as the offset strategy for new partitions after >> the >> > > > first discovery. >> > > > >> > > > Looking forward to hearing from you. >> > > > >> > > > >> > > > [1] >> > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source >> > > > >> > > > [2] < >> https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln >> > > >> > > > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln >> > > > >> > > > >> > > > Best, >> > > > >> > > > Hongshun >> > > > >> > > >> > >> >