Hi Shammon,

I agree with you. Since only EARLIEST is used, it's better not to mislead
users through the interface.


Yours

Hongshun

On Tue, Apr 18, 2023 at 7:12 PM Shammon FY <zjur...@gmail.com> wrote:

> Hi Hongshun
>
> Thanks for your explanation, I have got your point. I review the FLIP again
> and only have one minor comment which won't block this FLIP: should we need
> in `OffsetsInitializer newDiscoveryOffsetsInitializer` in the constructor
> of `KafkaSourceEnumerator`?  I think we can remove it if we always use
> EARLIEST for new discovery partitions.
>
> Best,
> Shammon FY
>
> On Tue, Apr 18, 2023 at 4:59 PM Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
> > Hi Shammon,
> >
> > Thank you for your advice.I have carefully considered whether to show
> this
> > in SQL DDL. Therefore, I carefully studied whether it is feasible
> Recently
> >
> > However,  after reading the corresponding code more thoroughly, it
> appears
> > that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
> > work as we initially thought. Finally, I have decided to only use
> > "EARLIEST" instead of allowing the user to make a free choice.
> >
> > Now, let me show my new understanding.
> >
> > The actual work of SpecifiedOffsetsInitializer and
> > TimestampOffsetsInitializer:
> >
> >
> >    - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
> >    partitions while use *EARLIEST* for unspecified partitions. Specified
> >    partitions offset should be less than the latest offset, otherwise it
> > will
> >    start from the *EARLIEST*.
> >    - *TimestampOffsetsInitializer*: Initialize the offsets based on a
> >    timestamp. If the message meeting the requirement of the timestamp
> have
> > not
> >    been produced to Kafka yet, just use the *LATEST* offset.
> >
> > So, some problems will occur when new partition use
> > SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
> > more information in the "Rejected Alternatives" section of Flip-288,
> which
> > includes details of the code and process of deductive reasoning.
> > All these problems can be reproducible in the current version. The reason
> > why they haven't been exposed is probably because users usually set the
> > existing specified offset or timestamp, so it appears as earliest in
> > production.
> >
> > WDYT?
> > CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.
> >
> > Yours
> >
> > Hongshun
> >
> >
> >
> >
> > On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zjur...@gmail.com> wrote:
> >
> > > Hi Hongshun
> > >
> > > Thanks for updating the FLIP, it totally sounds good to me.
> > >
> > > I just have one comment: How does sql job set new discovery offsets
> > > initializer?
> > > I found `DataStream` jobs can set different offsets initializers for
> new
> > > discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do
> > SQL
> > > jobs need to support this feature?
> > >
> > > Best,
> > > Shammon FY
> > >
> > > On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1...@gmail.com
> >
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I have already modified FLIP-288 to provide a
> > > > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> > > > KafkaSourceEnumerator. Users can use
> > > > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for
> > new
> > > > partitions.
> > > >
> > > > Surely, enabling the partition discovery strategy by default and
> > > modifying
> > > > the offset strategy for new partitions should be brought to the
> user's
> > > > attention. Therefore, it will be explained in the 1.18 release notes.
> > > >
> > > > WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> > > >
> > > >
> > > > Best,
> > > >
> > > > Hongshun
> > > >
> > > > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <
> loserwang1...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > > Thanks for your participation.
> > > > >
> > > > > @Gordon, I looked at the several questions you raised:
> > > > >
> > > > >    1. Should we use the firstDiscovery flag or two separate
> > > > >    OffsetsInitializers? Actually, I have considered later. If we
> > follow
> > > > >    my initial idea, we can provide a default earliest
> > > OffsetsInitializer
> > > > >    for a new partition. However, According to @Shammon's
> suggestion,
> > > > different
> > > > >    startup OffsetsInitializers correspond to different post-startup
> > > > >    OffsetsInitializers for Flink's built-in offset strategies.
> > > > >    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the
> > code
> > > > >    again, and it seems that neither @Shammon nor I have figured
> out .
> > > > >    TimestampOffsetsInitializer#getPartitionOffsets has a comment:
> > > "First
> > > > >    get the current end offsets of the partitions. This is going to
> be
> > > > used in
> > > > >    case we cannot find a suitable offset based on the timestamp,
> > i.e.,
> > > > the
> > > > >    message meeting the requirement of the timestamp has not been
> > > > produced to
> > > > >    Kafka yet. *In this case, we just use the latest offset*."
> > > Therefore,
> > > > >    using the TimestampOffsetsInitializer will always have an offset
> > at
> > > > >    startup.
> > > > >    3. Clarification on coupling SPECIFIC-OFFSET startup with
> > > > >    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already
> > uses
> > > > >    "auto.offset.reset" position for partitions that are not hit.
> > > > >
> > > > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about
> is
> > > > > whether the offset specified at the beginning includes non-exist
> > > > > partitions. The previous design may have SPECIFIC-OFFSET startup
> with
> > > > > future partition. However, I think since different strategies have
> > been
> > > > > used for the first discovered partition and the later discovered
> > > > partition,
> > > > > the specified offset at startup should be the partitions that have
> > been
> > > > > confirmed to exist, if not an error will be thrown. If partitions
> > still
> > > > not
> > > > > exist, it should be specified in the post-startup
> OffsetsInitializers
> > > > > (default EARLIEST).
> > > > >
> > > > > Best
> > > > > Hongshun
> > > > >
> > > > >
> > > > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com>
> > wrote:
> > > > >
> > > > >> Thanks Gordon and Leonard
> > > > >>
> > > > >> I'm sorry that there is no specific case from my side, but I
> > consider
> > > > the
> > > > >> issue as follows
> > > > >>
> > > > >> 1. Users may set an offset later than the current time because
> Flink
> > > > does
> > > > >> not limit it
> > > > >> 2. If we use EARLIEST for a newly discovered partition with
> > different
> > > > >> OFFSETs, which may be different from the previous strategy. I
> think
> > > it's
> > > > >> best to keep the same strategy as before if it does not cause data
> > > > losing
> > > > >> 3. I think support different OFFSETs in the FLIP will not make the
> > > > >> implementation more complexity
> > > > >>
> > > > >> Of course, if it is confirmed that this is an illegal Timestamp
> > OFFSET
> > > > and
> > > > >> Flink validate it. Then we can use the same strategy to apply to
> the
> > > > newly
> > > > >> discovered partition, I think this will be nice too
> > > > >>
> > > > >> Best,
> > > > >> Shammon FY
> > > > >>
> > > > >>
> > > > >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Thanks Hongshun and Shammon for driving the FLIP!
> > > > >> >
> > > > >> >
> > > > >> > > *2. Clarification on "future-time" TIMESTAMP
> OffsetsInitializer*
> > > > >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> > > > >> > SPECIFIC-OFFSET
> > > > >> > > post-startup*
> > > > >> >
> > > > >> > Grodan raised a good point about the future TIMESTAMP and
> > > > >> SPECIFIC-OFFSET,
> > > > >> > the timestamps/offset of the newly added partition is
> undetermined
> > > > when
> > > > >> the
> > > > >> > job starts (the partition has not been created yet), and it is
> the
> > > > >> > timestamps/offset in the future.
> > > > >> >
> > > > >> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In
> my
> > > > past
> > > > >> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are
> > usually
> > > > >> used
> > > > >> > to specify existing timestamps/offset, which are used for
> business
> > > > >> > scenarios such as backfilling data and re-refreshing data. At
> > > present,
> > > > >> It's
> > > > >> > hard to imagine a user scenario specifying a future timestamp to
> > > > filter
> > > > >> > data in the current topic of message queue system. Is it
> > > overthinking
> > > > to
> > > > >> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> > > > >> >
> > > > >> >
> > > > >> > Best,
> > > > >> > Leonard
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to