Hi hongshun

Thanks for updating, the FLIP's pretty good and looks good to me!

Best,
Shammon FY


On Tue, Mar 28, 2023 at 11:16 AM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi Shammon,
>
>
> Thanks a lot for your advise. I agree with your opinion now. It seems that
> I forgot to consider that it may be at a certain point in the future.
>
>
> I will modify OffsetsInitializer to provide a different strategy for later
> discovered partitions, by which users can also customize strategies for new
> and old partitions.
>
>  WDYT?
>
>
> Yours
>
> Hongshun
>
> On Tue, Mar 28, 2023 at 9:00 AM Shammon FY <zjur...@gmail.com> wrote:
>
> > Hi Hongshun
> >
> > Thanks for your answer.
> >
> > I think the startup offset of Kafka such as timestamp or
> > specific_offset has no relationship with `Window Operator`. Users can
> > freely set the starting position according to their needs, it may be
> before
> > the latest Kafka data, or it may be at a certain point in the future.
> >
> > The offsets set by users in Kafka can be divided into four types at the
> > moment: EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSET. The new discovered
> > partitions may need to be handled with different strategies for these
> four
> > types:
> >
> > 1. EARLIEST, use EARLIEST for the new discovered partitions
> > 2. LATEST, use EARLIEST for the new discovered partitions
> > 3. TIMESTAMP, use TIMESTAMP for the new discovered partitions
> > 4. SPECIFIC_OFFSET, use SPECIFIC_OFFSET for the new discovered partitions
> >
> > From above, it seems that we only need to do special processing for
> > EARLIEST. What do you think of it?
> >
> > Best,
> > Shammon FY
> >
> >
> > On Fri, Mar 24, 2023 at 11:23 AM Hongshun Wang <loserwang1...@gmail.com>
> > wrote:
> >
> > > "If all new messages in old partitions should be consumed, all new
> > messages
> > > in new partitions should also be consumed."
> > >
> > > Sorry, I wrote the last sentence incorrectly.
> > >
> > > On Fri, Mar 24, 2023 at 11:15 AM Hongshun Wang <
> loserwang1...@gmail.com>
> > > wrote:
> > >
> > > > Hi Shammon,
> > > >
> > > > Thanks for your advise!  I learn a lot about
> TIMESTAMP/SPECIFIC_OFFSET.
> > > > That's interesting.
> > > >
> > > > However, I have a different opinion.
> > > >
> > > > If a user employs the SPECIFIC_OFFSET strategy and enables
> > > auto-discovery,
> > > > they will be able to find new partitions beyond the specified offset.
> > > > Otherwise, enabling auto-discovery is no sense.
> > > >
> > > > When it comes to the TIMESTAMP strategy, it seems to be trivial. I
> > > > understand your concern, however, it’s the role of time window rather
> > > than
> > > > partition discovery. The TIMESTAMP strategy means that the consumer
> > > starts
> > > > from the first record whose timestamp is greater than or equal to a
> > given
> > > > timestamp, rather than only consuming all records whose timestamp is
> > > > greater than or equal to the given timestamp. *Thus, even disable
> auto
> > > > discovery or discover new partitions with TIMESTAMP strategy, same
> > > problems
> > > > still occur.*
> > > >
> > > > Above all , why use EARLIEST strategy? I believe that the strategy
> > > > specified by the startup should be the strategy at the moment of
> > > startup. *So
> > > > there is no difference between new partitions and new messages in old
> > > > partitions.* Therefore, all the new partition issues that you care
> > about
> > > > will still appear even if you disable the partition, as new messages
> in
> > > old
> > > > partitions. If all new messages in old partitions should be consume,
> > all
> > > > new messages in old partitions should also be consume.
> > > >
> > > >
> > > > Best,
> > > > Hongshun
> > > >
> > > > On Thu, Mar 23, 2023 at 8:34 PM Shammon FY <zjur...@gmail.com>
> wrote:
> > > >
> > > >> Hi Hongshun
> > > >>
> > > >> Thanks for driving this discussion. Automatically discovering
> > partitions
> > > >> without losing data sounds great!
> > > >>
> > > >> Currently flink supports kafka source with different startup modes,
> > such
> > > >> as
> > > >> EARLIEST, LATEST, TIMESTAMP, SPECIFIC_OFFSETS and GROUP_OFFSET.
> > > >>
> > > >> If I understand correctly, you will set the offset of new partitions
> > > with
> > > >> EARLIEST? Please correct me if I'm wrong, I think the EARLIEST
> startup
> > > >> mode
> > > >> for new partitions is not suitable if users set
> > > TIMESTAMP/SPECIFIC_OFFSET
> > > >> for kafka in their jobs.
> > > >>
> > > >> For an extreme example, the current time is 2023-03-23 15:00:00 and
> > > users
> > > >> set the TIMESTAMP with 2023-03-23 16:00:00 for their jobs. If a
> > > partition
> > > >> is added during this period, jobs will generate “surprising” data.
> > What
> > > do
> > > >> you think of it?
> > > >>
> > > >>
> > > >> Best,
> > > >> Shammon FY
> > > >>
> > > >>
> > > >> On Tue, Mar 21, 2023 at 6:58 PM Hongshun Wang <
> > loserwang1...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi, Hang,
> > > >> >
> > > >> > Thanks for your advice.
> > > >> >
> > > >> > When the second case will occur? Currently, there are three ways
> to
> > > >> specify
> > > >> > partitions in Kafka: by topic, by partition, and by matching the
> > topic
> > > >> with
> > > >> > a regular expression. Currently, if the initial partition number
> is
> > 0,
> > > >> an
> > > >> > error will occur for the first two methods. However, when using a
> > > >> regular
> > > >> > expression to match topics, it is allowed to have 0 matched
> topics.
> > > >> >
> > > >> > > I don't know when the second case will occur
> > > >> >
> > > >> >
> > > >> > Why prefer the field `firstDiscoveryDone`? When a regular
> expression
> > > >> > initially matches 0 topics, it should consume all messages of the
> > new
> > > >> > topic. If unassignedInitialPartitons and
> > unassignedTopLevelPartitions
> > > >> are
> > > >> > used instead of firstDiscoveryDone, any new topics created during
> (5
> > > >> > minutes discovery + job restart time) will be treated as the first
> > > >> > discovery, causing data loss.
> > > >> >
> > > >> > > Then when will we get the empty partition list? I think it
> should
> > be
> > > >> > treated as the first initial discovery if both
> > > >> `unassignedInitialPartitons`
> > > >> > and `assignedPartitons` are empty without `firstDiscoveryDone`.
> > > >> >
> > > >> > Best
> > > >> >
> > > >> > Hongshun
> > > >> >
> > > >> > On Tue, Mar 21, 2023 at 5:56 PM Hang Ruan <ruanhang1...@gmail.com
> >
> > > >> wrote:
> > > >> >
> > > >> > > Hi, Hongshun,
> > > >> > >
> > > >> > > Thank you for starting this discussion.  I have some problems
> > about
> > > >> the
> > > >> > > field `firstDiscoveryDone`.
> > > >> > >
> > > >> > > In the FLIP, why we need firstDiscoveryDone is as follows.
> > > >> > > > Why do we need firstDiscoveryDone? Only relying on the
> > > >> > > unAssignedInitialPartitons attribute cannot distinguish between
> > the
> > > >> > > following two cases (which often occur in pattern mode):
> > > >> > > > The first partition discovery is so slow, before which the
> > > >> checkpoint
> > > >> > is
> > > >> > > executed and then job is restarted . At this time, the restored
> > > >> > > unAssignedInitialPartitons is an empty set, which means
> > > non-discovery.
> > > >> > The
> > > >> > > next discovery will be treated as first discovery.
> > > >> > > > The first time the partition is discovered is empty, and new
> > > >> partitions
> > > >> > > can only be found after multiple partition discoveries. If a
> > restart
> > > >> > occurs
> > > >> > > between this period, the restored unAssignedInitialPartitons is
> > also
> > > >> an
> > > >> > > empty set, which means empty-discovery.The next discovery will
> be
> > > >> treated
> > > >> > > as new discovery.
> > > >> > >
> > > >> > > I don't know when the second case will occur. The partitions
> must
> > be
> > > >> > > greater than 0 when creating topics. And I have read this note
> in
> > > the
> > > >> > FLIP.
> > > >> > > > Note: The current design only applies to cases where all
> > existing
> > > >> > > partitions can be discovered at once. If all old partitions
> cannot
> > > be
> > > >> > > discovered at once, the subsequent old partitions discovered
> will
> > be
> > > >> > > treated as new partitions, leading to message duplication.
> > > Therefore,
> > > >> > this
> > > >> > > point needs to be particularly noted.
> > > >> > >
> > > >> > > Then when will we get the empty partition list? I think it
> should
> > be
> > > >> > > treated as the first initial discovery if both
> > > >> > `unassignedInitialPartitons`
> > > >> > > and `assignedPartitons` are empty without `firstDiscoveryDone`.
> > > >> > >
> > > >> > > Besides that, I think the `unAssignedInitialPartitons` is better
> > to
> > > be
> > > >> > > named `unassignedInitialPartitons`.
> > > >> > >
> > > >> > > Best,
> > > >> > > Hang
> > > >> > >
> > > >> > > Hongshun Wang <loserwang1...@gmail.com> 于2023年3月17日周五 18:42写道:
> > > >> > >
> > > >> > > > Hi everyone,
> > > >> > > >
> > > >> > > > I would like to start a discussion on FLIP-288:Enable Dynamic
> > > >> Partition
> > > >> > > > Discovery by Default in Kafka Source[1].
> > > >> > > >
> > > >> > > > As described in mail thread[2], dynamic partition discovery is
> > > >> disabled
> > > >> > > by
> > > >> > > > default and users have to explicitly specify the interval of
> > > >> discovery
> > > >> > in
> > > >> > > > order to turn it on. Besides, if the initial offset strategy
> is
> > > >> LATEST,
> > > >> > > > same strategy is used for new partitions, leading to the loss
> of
> > > >> some
> > > >> > > data
> > > >> > > > (thinking a new partition is created and might be discovered
> by
> > > >> Kafka
> > > >> > > > source several minutes later, and the message produced into
> the
> > > >> > partition
> > > >> > > > within the gap might be dropped if we use for example "latest"
> > as
> > > >> the
> > > >> > > > initial offset strategy.)
> > > >> > > >
> > > >> > > > The goals of this FLIP are as follows:
> > > >> > > >
> > > >> > > >    1. Enable partition discovery by default.
> > > >> > > >    2. Use earliest as the offset strategy for new partitions
> > after
> > > >> the
> > > >> > > >    first discovery.
> > > >> > > >
> > > >> > > > Looking forward to hearing from you.
> > > >> > > >
> > > >> > > >
> > > >> > > > [1]
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-288%3A+Enable+Dynamic+Partition+Discovery+by+Default+in+Kafka+Source
> > > >> > > >
> > > >> > > > [2]  <
> > > >> https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
> > > >> > >
> > > >> > > >
> > https://lists.apache.org/thread/d7zy46gj3sw0zwzq2rj3fmc0hx8ojtln
> > > >> > > >
> > > >> > > >
> > > >> > > > Best,
> > > >> > > >
> > > >> > > > Hongshun
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to