> I have already modified FLIP-288 to provide a
newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
KafkaSourceEnumerator. Users can use
KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
partitions.

Thanks for addressing my comment Hongshun.

> Considering these reasons and facts, I’m +1 to only use EARLIEST for  new
discovered partitions.

Sounds good to me.


Overall, +1 to this proposal in principle (I'll formally vote on the vote
thread as well).

Thanks,
Gordon

On Tue, Apr 18, 2023 at 9:12 PM Leonard Xu <xbjt...@gmail.com> wrote:

> Thanks Hongshun for deeper analysis of the existing KafkaSource
> implementation details, Cool!
> There’s no specific use case to use a future TIMESTAMP and SPECIFIC-OFFSET
> for new discovered partitions
>  The existing SpecifiedOffsetsInitializer will use the EARLIEST offset for
> unspecified partitions as well as new discovered partitions
>  The existing TimestampOffsetsInitializer will use the LATEST offset for
> future timestamp, the  LATEST offset is similar to  EARLIEST offset for new
> discovered partitions  in this case,  and EARLIEST is safer as it covers
> all records.
> Considering these reasons and facts, I’m +1 to only use EARLIEST for  new
> discovered partitions.
>
> The updated FLIP looks good to me, we can start a vote thread soon if
> there are no new divergences.
>
> Best,
> Leonard
>
> > On Apr 18, 2023, at 4:58 PM, Hongshun Wang <loserwang1...@gmail.com>
> wrote:
> >
> > Hi Shammon,
> >
> > Thank you for your advice.I have carefully considered whether to show
> this
> > in SQL DDL. Therefore, I carefully studied whether it is feasible
> Recently
> >
> > However,  after reading the corresponding code more thoroughly, it
> appears
> > that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not
> > work as we initially thought. Finally, I have decided to only use
> > "EARLIEST" instead of allowing the user to make a free choice.
> >
> > Now, let me show my new understanding.
> >
> > The actual work of SpecifiedOffsetsInitializer and
> > TimestampOffsetsInitializer:
> >
> >
> >   - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified
> >   partitions while use *EARLIEST* for unspecified partitions. Specified
> >   partitions offset should be less than the latest offset, otherwise it
> will
> >   start from the *EARLIEST*.
> >   - *TimestampOffsetsInitializer*: Initialize the offsets based on a
> >   timestamp. If the message meeting the requirement of the timestamp
> have not
> >   been produced to Kafka yet, just use the *LATEST* offset.
> >
> > So, some problems will occur when new partition use
> > SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find
> > more information in the "Rejected Alternatives" section of Flip-288,
> which
> > includes details of the code and process of deductive reasoning.
> > All these problems can be reproducible in the current version. The reason
> > why they haven't been exposed is probably because users usually set the
> > existing specified offset or timestamp, so it appears as earliest in
> > production.
> >
> > WDYT?
> > CC:Ruan, Shammon, Gordon, Leonard and Qingsheng.
> >
> > Yours
> >
> > Hongshun
> >
> >
> >
> >
> > On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zjur...@gmail.com> wrote:
> >
> >> Hi Hongshun
> >>
> >> Thanks for updating the FLIP, it totally sounds good to me.
> >>
> >> I just have one comment: How does sql job set new discovery offsets
> >> initializer?
> >> I found `DataStream` jobs can set different offsets initializers for new
> >> discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do
> SQL
> >> jobs need to support this feature?
> >>
> >> Best,
> >> Shammon FY
> >>
> >> On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1...@gmail.com>
> >> wrote:
> >>
> >>> Hi everyone,
> >>>
> >>> I have already modified FLIP-288 to provide a
> >>> newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> >>> KafkaSourceEnumerator. Users can use
> >>> KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for
> new
> >>> partitions.
> >>>
> >>> Surely, enabling the partition discovery strategy by default and
> >> modifying
> >>> the offset strategy for new partitions should be brought to the user's
> >>> attention. Therefore, it will be explained in the 1.18 release notes.
> >>>
> >>> WDYT?CC, Ruan, Shammon, Gordon and Leonard.
> >>>
> >>>
> >>> Best,
> >>>
> >>> Hongshun
> >>>
> >>> On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1...@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Hi everyone,
> >>>> Thanks for your participation.
> >>>>
> >>>> @Gordon, I looked at the several questions you raised:
> >>>>
> >>>>   1. Should we use the firstDiscovery flag or two separate
> >>>>   OffsetsInitializers? Actually, I have considered later. If we follow
> >>>>   my initial idea, we can provide a default earliest
> >> OffsetsInitializer
> >>>>   for a new partition. However, According to @Shammon's suggestion,
> >>> different
> >>>>   startup OffsetsInitializers correspond to different post-startup
> >>>>   OffsetsInitializers for Flink's built-in offset strategies.
> >>>>   2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
> >>>>   again, and it seems that neither @Shammon nor I have figured out .
> >>>>   TimestampOffsetsInitializer#getPartitionOffsets has a comment:
> >> "First
> >>>>   get the current end offsets of the partitions. This is going to be
> >>> used in
> >>>>   case we cannot find a suitable offset based on the timestamp, i.e.,
> >>> the
> >>>>   message meeting the requirement of the timestamp has not been
> >>> produced to
> >>>>   Kafka yet. *In this case, we just use the latest offset*."
> >> Therefore,
> >>>>   using the TimestampOffsetsInitializer will always have an offset at
> >>>>   startup.
> >>>>   3. Clarification on coupling SPECIFIC-OFFSET startup with
> >>>>   SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
> >>>>   "auto.offset.reset" position for partitions that are not hit.
> >>>>
> >>>> @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> >>>> whether the offset specified at the beginning includes non-exist
> >>>> partitions. The previous design may have SPECIFIC-OFFSET startup with
> >>>> future partition. However, I think since different strategies have
> been
> >>>> used for the first discovered partition and the later discovered
> >>> partition,
> >>>> the specified offset at startup should be the partitions that have
> been
> >>>> confirmed to exist, if not an error will be thrown. If partitions
> still
> >>> not
> >>>> exist, it should be specified in the post-startup OffsetsInitializers
> >>>> (default EARLIEST).
> >>>>
> >>>> Best
> >>>> Hongshun
> >>>>
> >>>>
> >>>> On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> wrote:
> >>>>
> >>>>> Thanks Gordon and Leonard
> >>>>>
> >>>>> I'm sorry that there is no specific case from my side, but I consider
> >>> the
> >>>>> issue as follows
> >>>>>
> >>>>> 1. Users may set an offset later than the current time because Flink
> >>> does
> >>>>> not limit it
> >>>>> 2. If we use EARLIEST for a newly discovered partition with different
> >>>>> OFFSETs, which may be different from the previous strategy. I think
> >> it's
> >>>>> best to keep the same strategy as before if it does not cause data
> >>> losing
> >>>>> 3. I think support different OFFSETs in the FLIP will not make the
> >>>>> implementation more complexity
> >>>>>
> >>>>> Of course, if it is confirmed that this is an illegal Timestamp
> OFFSET
> >>> and
> >>>>> Flink validate it. Then we can use the same strategy to apply to the
> >>> newly
> >>>>> discovered partition, I think this will be nice too
> >>>>>
> >>>>> Best,
> >>>>> Shammon FY
> >>>>>
> >>>>>
> >>>>> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Thanks Hongshun and Shammon for driving the FLIP!
> >>>>>>
> >>>>>>
> >>>>>>> *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> >>>>>>> *3. Clarification on coupling SPECIFIC-OFFSET startup with
> >>>>>> SPECIFIC-OFFSET
> >>>>>>> post-startup*
> >>>>>>
> >>>>>> Grodan raised a good point about the future TIMESTAMP and
> >>>>> SPECIFIC-OFFSET,
> >>>>>> the timestamps/offset of the newly added partition is undetermined
> >>> when
> >>>>> the
> >>>>>> job starts (the partition has not been created yet), and it is the
> >>>>>> timestamps/offset in the future.
> >>>>>>
> >>>>>> I used many message queue systems like Kafka, Pulsar, xxMQ. In my
> >>> past
> >>>>>> experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
> >>>>> used
> >>>>>> to specify existing timestamps/offset, which are used for business
> >>>>>> scenarios such as backfilling data and re-refreshing data. At
> >> present,
> >>>>> It's
> >>>>>> hard to imagine a user scenario specifying a future timestamp to
> >>> filter
> >>>>>> data in the current topic of message queue system. Is it
> >> overthinking
> >>> to
> >>>>>> consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> >>>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Leonard
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to