Hi everyone,

I have already modified FLIP-288 to provide a
newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and
KafkaSourceEnumerator. Users can use
KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new
partitions.

Surely, enabling the partition discovery strategy by default and modifying
the offset strategy for new partitions should be brought to the user's
attention. Therefore, it will be explained in the 1.18 release notes.

WDYT?CC, Ruan, Shammon, Gordon and Leonard.


Best,

Hongshun

On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1...@gmail.com>
wrote:

> Hi everyone,
> Thanks for your participation.
>
> @Gordon, I looked at the several questions you raised:
>
>    1. Should we use the firstDiscovery flag or two separate
>    OffsetsInitializers? Actually, I have considered later. If we follow
>    my initial idea, we can provide a default earliest OffsetsInitializer
>    for a new partition. However, According to @Shammon's suggestion, different
>    startup OffsetsInitializers correspond to different post-startup
>    OffsetsInitializers for Flink's built-in offset strategies.
>    2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code
>    again, and it seems that neither @Shammon nor I have figured out .
>    TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First
>    get the current end offsets of the partitions. This is going to be used in
>    case we cannot find a suitable offset based on the timestamp, i.e., the
>    message meeting the requirement of the timestamp has not been produced to
>    Kafka yet. *In this case, we just use the latest offset*." Therefore,
>    using the TimestampOffsetsInitializer will always have an offset at
>    startup.
>    3. Clarification on coupling SPECIFIC-OFFSET startup with
>    SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses
>    "auto.offset.reset" position for partitions that are not hit.
>
> @Gordon, @Shammon, @Leonard, the core issue we are concerned about is
> whether the offset specified at the beginning includes non-exist
> partitions. The previous design may have SPECIFIC-OFFSET startup with
> future partition. However, I think since different strategies have been
> used for the first discovered partition and the later discovered partition,
> the specified offset at startup should be the partitions that have been
> confirmed to exist, if not an error will be thrown. If partitions still not
> exist, it should be specified in the post-startup OffsetsInitializers
> (default EARLIEST).
>
> Best
> Hongshun
>
>
> On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> wrote:
>
>> Thanks Gordon and Leonard
>>
>> I'm sorry that there is no specific case from my side, but I consider the
>> issue as follows
>>
>> 1. Users may set an offset later than the current time because Flink does
>> not limit it
>> 2. If we use EARLIEST for a newly discovered partition with different
>> OFFSETs, which may be different from the previous strategy. I think it's
>> best to keep the same strategy as before if it does not cause data losing
>> 3. I think support different OFFSETs in the FLIP will not make the
>> implementation more complexity
>>
>> Of course, if it is confirmed that this is an illegal Timestamp OFFSET and
>> Flink validate it. Then we can use the same strategy to apply to the newly
>> discovered partition, I think this will be nice too
>>
>> Best,
>> Shammon FY
>>
>>
>> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com> wrote:
>>
>> > Thanks Hongshun and Shammon for driving the FLIP!
>> >
>> >
>> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer*
>> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with
>> > SPECIFIC-OFFSET
>> > > post-startup*
>> >
>> > Grodan raised a good point about the future TIMESTAMP and
>> SPECIFIC-OFFSET,
>> > the timestamps/offset of the newly added partition is undetermined when
>> the
>> > job starts (the partition has not been created yet), and it is the
>> > timestamps/offset in the future.
>> >
>> >  I used many message queue systems like Kafka, Pulsar, xxMQ. In my past
>> > experience,  TIMESTAMP and SPECIFIC-OFFSET startup modes are usually
>> used
>> > to specify existing timestamps/offset, which are used for business
>> > scenarios such as backfilling data and re-refreshing data. At present,
>> It's
>> > hard to imagine a user scenario specifying a future timestamp to filter
>> > data in the current topic of message queue system. Is it overthinking to
>> > consider future  future TIMESTAMP and SPECIFIC-OFFSET?
>> >
>> >
>> > Best,
>> > Leonard
>>
>

Reply via email to