"If all new messages in old partitions should be consumed, all new messages
in new partitions should also be consumed."

Sorry, I wrote the last sentence incorrectly.

On Fri, Mar 24, 2023 at 11:15 AM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi Shammon,
>
> Thanks for your advise!  I learn a lot about TIMESTAMP/SPECIFIC_OFFSET.
> That's interesting.
>
> However, I have a different opinion.
>
> If a user employs the SPECIFIC_OFFSET strategy and enables auto-discovery,
> they will be able to find new partitions beyond the specified offset.
> Otherwise, enabling auto-discovery is no sense.
>
> When it comes to the TIMESTAMP strategy, it seems to be trivial. I
> understand your concern, however, it’s the role of time window rather than
> partition discovery. The TIMESTAMP strategy means that the consumer starts
> from the first record whose timestamp is greater than or equal to a given
> timestamp, rather than only consuming all records whose timestamp is
> greater than or equal to the given timestamp. *Thus, even disable auto
> discovery or discover new partitions with TIMESTAMP strategy, same problems
> still occur.*
>
> Above all , why use EARLIEST strategy? I believe that the strategy
> specified by the startup should be the strategy at the moment of startup. *So
> there is no difference between new partitions and new messages in old
> partitions.* Therefore, all the new partition issues that you care about
> will still appear even if you disable the partition, as new messages in old
> partitions. If all new messages in old partitions should be consume, all
> new messages in old partitions should also be consume.
>
>
> Best,
> Hongshun
>
> On Thu, Mar 23, 2023 at 8:34 PM Shammon FY <zjur...@gmail.com> wrote:
>
>> Hi Hongshun
>>
>> Thanks for driving this discussion. Automatically discovering partitions
>> without losing data sounds great!
>>
>> Currently flink supports kafka source with different startup modes, such
>> as
>> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.
>>
>> If I understand correctly, you will set the offset of new partitions with
>> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST startup
>> mode
>> for new partitions is not suitable if users set TIMESTAMP/SPECIFIC_OFFSET
>> for kafka in their jobs.
>>
>> For an extreme example, the current time is 2023-03-23 15:00:00 and users
>> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a partition
>> is added during this period, jobs will generate “surprising” data. What do
>> you think of it?
>>
>>
>> Best,
>> Shammon FY
>>
>>
>> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang <loserwang1...@gmail.com>
>> wrote:
>>
>> > Hi, Hang,
>> >
>> > Thanks for your advice.
>> >
>> > When the second case will occur? Currently, there are three ways to
>> specify
>> > partitions in Kafka: by topic, by partition, and by matching the topic
>> with
>> > a regular expression. Currently, if the initial partition number is 0,
>> an
>> > error will occur for the first two methods. However, when using a
>> regular
>> > expression to match topics, it is allowed to have 0 matched topics.
>> >
>> > > I don't know when the second case will occur
>> >
>> >
>> > Why prefer the field `firstDiscoveryDone`? When a regular expression
>> > initially matches 0 topics, it should consume all messages of the new
>> > topic. If unassignedInitialPartitons and unassignedTopLevelPartitions
>> are
>> > used instead of firstDiscoveryDone, any new topics created during (5
>> > minutes discovery + job restart time) will be treated as the first
>> > discovery, causing data loss.
>> >
>> > > Then when will we get the empty partition list? I think it should be
>> > treated as the first initial discovery if both
>> `unassignedInitialPartitons`
>> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
>> >
>> > Best
>> >
>> > Hongshun
>> >
>> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan <ruanhang1...@gmail.com>
>> wrote:
>> >
>> > > Hi, Hongshun,
>> > >
>> > > Thank you for starting this discussion.  I have some problems about
>> the
>> > > field `firstDiscoveryDone`.
>> > >
>> > > In the FLIP, why we need firstDiscoveryDone is as follows.
>> > > > Why do we need firstDiscoveryDone? Only relying on the
>> > > unAssignedInitialPartitons attribute cannot distinguish between the
>> > > following two cases (which often occur in pattern mode):
>> > > > The first partition discovery is so slow, before which the
>> checkpoint
>> > is
>> > > executed and then job is restarted . At this time, the restored
>> > > unAssignedInitialPartitons is an empty set, which means non-discovery.
>> > The
>> > > next discovery will be treated as first discovery.
>> > > > The first time the partition is discovered is empty, and new
>> partitions
>> > > can only be found after multiple partition discoveries. If a restart
>> > occurs
>> > > between this period, the restored unAssignedInitialPartitons is also
>> an
>> > > empty set, which means empty-discovery.The next discovery will be
>> treated
>> > > as new discovery.
>> > >
>> > > I don't know when the second case will occur. The partitions must be
>> > > greater than 0 when creating topics. And I have read this note in the
>> > FLIP.
>> > > > Note: The current design only applies to cases where all existing
>> > > partitions can be discovered at once. If all old partitions cannot be
>> > > discovered at once, the subsequent old partitions discovered will be
>> > > treated as new partitions, leading to message duplication. Therefore,
>> > this
>> > > point needs to be particularly noted.
>> > >
>> > > Then when will we get the empty partition list? I think it should be
>> > > treated as the first initial discovery if both
>> > `unassignedInitialPartitons`
>> > > and `assignedPartitons` are empty without `firstDiscoveryDone`.
>> > >
>> > > Besides that, I think the `unAssignedInitialPartitons` is better to be
>> > > named `unassignedInitialPartitons`.
>> > >
>> > > Best,
>> > > Hang
>> > >
>> > > Hongshun Wang <loserwang1...@gmail.com> 于2023年3月17日周五 18:42写道:
>> > >
>> > > > Hi everyone,
>> > > >
>> > > > I would like to start a discussion on FLIP-288:Enable Dynamic
>> Partition
>> > > > Discovery by Default in Kafka Source[1].
>> > > >
>> > > > As described in mail thread[2], dynamic partition discovery is
>> disabled
>> > > by
>> > > > default and users have to explicitly specify the interval of
>> discovery
>> > in
>> > > > order to turn it on. Besides, if the initial offset strategy is
>> LATEST,
>> > > > same strategy is used for new partitions, leading to the loss of
>> some
>> > > data
>> > > > (thinking a new partition is created and might be discovered by
>> Kafka
>> > > > source several minutes later, and the message produced into the
>> > partition
>> > > > within the gap might be dropped if we use for example "latest" as
>> the
>> > > > initial offset strategy.)
>> > > >
>> > > > The goals of this FLIP are as follows:
>> > > >
>> > > >    1. Enable partition discovery by default.
>> > > >    2. Use earliest as the offset strategy for new partitions after
>> the
>> > > >    first discovery.
>> > > >
>> > > > Looking forward to hearing from you.
>> > > >
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
>> > > >
>> > > > [2]  <
>> https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
>> > >
>> > > > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
>> > > >
>> > > >
>> > > > Best,
>> > > >
>> > > > Hongshun
>> > > >
>> > >
>> >
>>
>

Reply via email to