Hi Hongshun Thanks for driving this discussion. Automatically discovering partitions without losing data sounds great!
Currently flink supports kafka source with different startup modes, such as EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET. If I understand correctly, you will set the offset of new partitions with EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup mode for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET for kafka in their jobs. For an extreme example, the current time is 2023-03-23 15:00:00 and users set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition is added during this period, jobs will generate “surprising” data. What do you think of it? Best, Shammon FY On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang <[email protected]> wrote: > Hi, Hang, > > Thanks for your advice. > > When the second case will occur? Currently, there are three ways to specify > partitions in Kafka: by topic, by partition, and by matching the topic with > a regular expression. Currently, if the initial partition number is 0, an > error will occur for the first two methods. However, when using a regular > expression to match topics, it is allowed to have 0 matched topics. > > > I don't know when the second case will occur > > > Why prefer the field `firstDiscoveryDone`? When a regular expression > initially matches 0 topics, it should consume all messages of the new > topic. If unassignedInitialPartitons and unassignedTopLevelPartitions are > used instead of firstDiscoveryDone, any new topics created during (5 > minutes discovery + job restart time) will be treated as the first > discovery, causing data loss. > > > Then when will we get the empty partition list? I think it should be > treated as the first initial discovery if both `unassignedInitialPartitons` > and `assignedPartitons` are empty without `firstDiscoveryDone`. > > Best > > Hongshun > > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan <[email protected]> wrote: > > > Hi, Hongshun, > > > > Thank you for starting this discussion. I have some problems about the > > field `firstDiscoveryDone`. > > > > In the FLIP, why we need firstDiscoveryDone is as follows. > > > Why do we need firstDiscoveryDone? Only relying on the > > unAssignedInitialPartitons attribute cannot distinguish between the > > following two cases (which often occur in pattern mode): > > > The first partition discovery is so slow, before which the checkpoint > is > > executed and then job is restarted . At this time, the restored > > unAssignedInitialPartitons is an empty set, which means non-discovery. > The > > next discovery will be treated as first discovery. > > > The first time the partition is discovered is empty, and new partitions > > can only be found after multiple partition discoveries. If a restart > occurs > > between this period, the restored unAssignedInitialPartitons is also an > > empty set, which means empty-discovery.The next discovery will be treated > > as new discovery. > > > > I don't know when the second case will occur. The partitions must be > > greater than 0 when creating topics. And I have read this note in the > FLIP. > > > Note: The current design only applies to cases where all existing > > partitions can be discovered at once. If all old partitions cannot be > > discovered at once, the subsequent old partitions discovered will be > > treated as new partitions, leading to message duplication. Therefore, > this > > point needs to be particularly noted. > > > > Then when will we get the empty partition list? I think it should be > > treated as the first initial discovery if both > `unassignedInitialPartitons` > > and `assignedPartitons` are empty without `firstDiscoveryDone`. > > > > Besides that, I think the `unAssignedInitialPartitons` is better to be > > named `unassignedInitialPartitons`. > > > > Best, > > Hang > > > > Hongshun Wang <[email protected]> 于2023年3月17日周五 18:42写道: > > > > > Hi everyone, > > > > > > I would like to start a discussion on FLIP-288:Enable Dynamic Partition > > > Discovery by Default in Kafka Source[1]. > > > > > > As described in mail thread[2], dynamic partition discovery is disabled > > by > > > default and users have to explicitly specify the interval of discovery > in > > > order to turn it on. Besides, if the initial offset strategy is LATEST, > > > same strategy is used for new partitions, leading to the loss of some > > data > > > (thinking a new partition is created and might be discovered by Kafka > > > source several minutes later, and the message produced into the > partition > > > within the gap might be dropped if we use for example "latest" as the > > > initial offset strategy.) > > > > > > The goals of this FLIP are as follows: > > > > > > 1. Enable partition discovery by default. > > > 2. Use earliest as the offset strategy for new partitions after the > > > first discovery. > > > > > > Looking forward to hearing from you. > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source > > > > > > [2] <https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln > > > > > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln > > > > > > > > > Best, > > > > > > Hongshun > > > > > >
