Hi Hongshun

Thanks for updating the FLIP, it totally sounds good to me.

I just have one comment: How does sql job set new discovery offsets
initializer?
I found `DataStream` jobs can set different offsets initializers for new
discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do SQL
jobs need to support this feature?

Best,
Shammon FY

On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi everyone,
>
> I have already modified FLIP-288 to provide a
> newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
> KafkaSourceEnumerator. Users can use
> KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
> partitions.
>
> Surely, enabling the partition discovery strategy by default and modifying
> the offset strategy for new partitions should be brought to the user's
> attention. Therefore, it will be explained in the 1.18 release notes.
>
> WDYT?CC, Ruan, Shammon, Gordon and Leonard.
>
>
> Best,
>
> Hongshun
>
> On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1...@gmail.com>
> wrote:
>
> > Hi everyone,
> > Thanks for your participation.
> >
> > @Gordon, I looked at the several questions you raised:
> >
> >    1. Should we use the firstDiscovery flag or two separate
> >    OffsetsInitializers? Actually, I have considered later. If we follow
> >    my initial idea, we can provide a default earliest OffsetsInitializer
> >    for a new partition. However, According to @Shammon's suggestion,
> different
> >    startup OffsetsInitializers correspond to different post-startup
> >    OffsetsInitializers for Flink's built-in offset strategies.
> >    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
> >    again, and it seems that neither @Shammon nor I have figured out .
> >    TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First
> >    get the current end offsets of the partitions. This is going to be
> used in
> >    case we cannot find a suitable offset based on the timestamp, i.e.,
> the
> >    message meeting the requirement of the timestamp has not been
> produced to
> >    Kafka yet. *In this case, we just use the latest offset*." Therefore,
> >    using the TimestampOffsetsInitializer will always have an offset at
> >    startup.
> >    3. Clarification on coupling SPECIFIC-OFFSET startup with
> >    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
> >    "auto.offset.reset" position for partitions that are not hit.
> >
> > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> > whether the offset specified at the beginning includes non-exist
> > partitions. The previous design may have SPECIFIC-OFFSET startup with
> > future partition. However, I think since different strategies have been
> > used for the first discovered partition and the later discovered
> partition,
> > the specified offset at startup should be the partitions that have been
> > confirmed to exist, if not an error will be thrown. If partitions still
> not
> > exist, it should be specified in the post-startup OffsetsInitializers
> > (default EARLIEST).
> >
> > Best
> > Hongshun
> >
> >
> > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> wrote:
> >
> >> Thanks Gordon and Leonard
> >>
> >> I'm sorry that there is no specific case from my side, but I consider
> the
> >> issue as follows
> >>
> >> 1. Users may set an offset later than the current time because Flink
> does
> >> not limit it
> >> 2. If we use EARLIEST for a newly discovered partition with different
> >> OFFSETs, which may be different from the previous strategy. I think it's
> >> best to keep the same strategy as before if it does not cause data
> losing
> >> 3. I think support different OFFSETs in the FLIP will not make the
> >> implementation more complexity
> >>
> >> Of course, if it is confirmed that this is an illegal Timestamp OFFSET
> and
> >> Flink validate it. Then we can use the same strategy to apply to the
> newly
> >> discovered partition, I think this will be nice too
> >>
> >> Best,
> >> Shammon FY
> >>
> >>
> >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com> wrote:
> >>
> >> > Thanks Hongshun and Shammon for driving the FLIP!
> >> >
> >> >
> >> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> >> > SPECIFIC-OFFSET
> >> > > post-startup*
> >> >
> >> > Grodan raised a good point about the future TIMESTAMP and
> >> SPECIFIC-OFFSET,
> >> > the timestamps/offset of the newly added partition is undetermined
> when
> >> the
> >> > job starts (the partition has not been created yet), and it is the
> >> > timestamps/offset in the future.
> >> >
> >> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my
> past
> >> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
> >> used
> >> > to specify existing timestamps/offset, which are used for business
> >> > scenarios such as backfilling data and re-refreshing data. At present,
> >> It's
> >> > hard to imagine a user scenario specifying a future timestamp to
> filter
> >> > data in the current topic of message queue system. Is it overthinking
> to
> >> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> >> >
> >> >
> >> > Best,
> >> > Leonard
> >>
> >
>

Reply via email to