Hi everyone,
Thanks for your participation.

@Gordon, I looked at the several questions you raised:

   1. Should we use the firstDiscovery flag or two separate
   OffsetsInitializers? Actually, I have considered later. If we follow my
   initial idea, we can provide a default earliest OffsetsInitializer for a
   new partition. However, According to @Shammon's suggestion, different
   startup OffsetsInitializers correspond to different post-startup
   OffsetsInitializers for Flink's built-in offset strategies.
   2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
   again, and it seems that neither @Shammon nor I have figured out .
   TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First
   get the current end offsets of the partitions. This is going to be used in
   case we cannot find a suitable offset based on the timestamp, i.e., the
   message meeting the requirement of the timestamp has not been produced to
   Kafka yet. *In this case, we just use the latest offset*." Therefore,
   using the TimestampOffsetsInitializer will always have an offset at
   startup.
   3. Clarification on coupling SPECIFIC-OFFSET startup with SPECIFIC-OFFSET
   post-startup. SPECIFIC-OFFSET strategy already uses "auto.offset.reset"
   position for partitions that are not hit.

@Gordon, @Shammon, @Leonard, the core issue we are concerned about is
whether the offset specified at the beginning includes non-exist
partitions. The previous design may have SPECIFIC-OFFSET startup with
future partition. However, I think since different strategies have been
used for the first discovered partition and the later discovered partition,
the specified offset at startup should be the partitions that have been
confirmed to exist, if not an error will be thrown. If partitions still not
exist, it should be specified in the post-startup OffsetsInitializers
(default EARLIEST).

Best
Hongshun


On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> wrote:

> Thanks Gordon and Leonard
>
> I'm sorry that there is no specific case from my side, but I consider the
> issue as follows
>
> 1. Users may set an offset later than the current time because Flink does
> not limit it
> 2. If we use EARLIEST for a newly discovered partition with different
> OFFSETs, which may be different from the previous strategy. I think it's
> best to keep the same strategy as before if it does not cause data losing
> 3. I think support different OFFSETs in the FLIP will not make the
> implementation more complexity
>
> Of course, if it is confirmed that this is an illegal Timestamp OFFSET and
> Flink validate it. Then we can use the same strategy to apply to the newly
> discovered partition, I think this will be nice too
>
> Best,
> Shammon FY
>
>
> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com> wrote:
>
> > Thanks Hongshun and Shammon for driving the FLIP!
> >
> >
> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
> > SPECIFIC-OFFSET
> > > post-startup*
> >
> > Grodan raised a good point about the future TIMESTAMP and
> SPECIFIC-OFFSET,
> > the timestamps/offset of the newly added partition is undetermined when
> the
> > job starts (the partition has not been created yet), and it is the
> > timestamps/offset in the future.
> >
> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my past
> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually used
> > to specify existing timestamps/offset, which are used for business
> > scenarios such as backfilling data and re-refreshing data. At present,
> It's
> > hard to imagine a user scenario specifying a future timestamp to filter
> > data in the current topic of message queue system. Is it overthinking to
> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
> >
> >
> > Best,
> > Leonard
>

Reply via email to