Hi Shammon, I agree with you. Since only EARLIEST is used, it's better not to mislead users through the interface.
Yours Hongshun On Tue, Apr 18, 2023 at 7:12 PM Shammon FY <zjur...@gmail.com> wrote: > Hi Hongshun > > Thanks for your explanation, I have got your point. I review the FLIP again > and only have one minor comment which won't block this FLIP: should we need > in `OffsetsInitializer newDiscoveryOffsetsInitializer` in the constructor > of `KafkaSourceEnumerator`? I think we can remove it if we always use > EARLIEST for new discovery partitions. > > Best, > Shammon FY > > On Tue, Apr 18, 2023 at 4:59 PM Hongshun Wang <loserwang1...@gmail.com> > wrote: > > > Hi Shammon, > > > > Thank you for your advice.I have carefully considered whether to show > this > > in SQL DDL. Therefore, I carefully studied whether it is feasible > Recently > > > > However, after reading the corresponding code more thoroughly, it > appears > > that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not > > work as we initially thought. Finally, I have decided to only use > > "EARLIEST" instead of allowing the user to make a free choice. > > > > Now, let me show my new understanding. > > > > The actual work of SpecifiedOffsetsInitializer and > > TimestampOffsetsInitializer: > > > > > > - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified > > partitions while use *EARLIEST* for unspecified partitions. Specified > > partitions offset should be less than the latest offset, otherwise it > > will > > start from the *EARLIEST*. > > - *TimestampOffsetsInitializer*: Initialize the offsets based on a > > timestamp. If the message meeting the requirement of the timestamp > have > > not > > been produced to Kafka yet, just use the *LATEST* offset. > > > > So, some problems will occur when new partition use > > SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find > > more information in the "Rejected Alternatives" section of Flip-288, > which > > includes details of the code and process of deductive reasoning. > > All these problems can be reproducible in the current version. The reason > > why they haven't been exposed is probably because users usually set the > > existing specified offset or timestamp, so it appears as earliest in > > production. > > > > WDYT? > > CC:Ruan, Shammon, Gordon, Leonard and Qingsheng. > > > > Yours > > > > Hongshun > > > > > > > > > > On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zjur...@gmail.com> wrote: > > > > > Hi Hongshun > > > > > > Thanks for updating the FLIP, it totally sounds good to me. > > > > > > I just have one comment: How does sql job set new discovery offsets > > > initializer? > > > I found `DataStream` jobs can set different offsets initializers for > new > > > discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do > > SQL > > > jobs need to support this feature? > > > > > > Best, > > > Shammon FY > > > > > > On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1...@gmail.com > > > > > wrote: > > > > > > > Hi everyone, > > > > > > > > I have already modified FLIP-288 to provide a > > > > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and > > > > KafkaSourceEnumerator. Users can use > > > > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for > > new > > > > partitions. > > > > > > > > Surely, enabling the partition discovery strategy by default and > > > modifying > > > > the offset strategy for new partitions should be brought to the > user's > > > > attention. Therefore, it will be explained in the 1.18 release notes. > > > > > > > > WDYT?CC, Ruan, Shammon, Gordon and Leonard. > > > > > > > > > > > > Best, > > > > > > > > Hongshun > > > > > > > > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang < > loserwang1...@gmail.com > > > > > > > wrote: > > > > > > > > > Hi everyone, > > > > > Thanks for your participation. > > > > > > > > > > @Gordon, I looked at the several questions you raised: > > > > > > > > > > 1. Should we use the firstDiscovery flag or two separate > > > > > OffsetsInitializers? Actually, I have considered later. If we > > follow > > > > > my initial idea, we can provide a default earliest > > > OffsetsInitializer > > > > > for a new partition. However, According to @Shammon's > suggestion, > > > > different > > > > > startup OffsetsInitializers correspond to different post-startup > > > > > OffsetsInitializers for Flink's built-in offset strategies. > > > > > 2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the > > code > > > > > again, and it seems that neither @Shammon nor I have figured > out . > > > > > TimestampOffsetsInitializer#getPartitionOffsets has a comment: > > > "First > > > > > get the current end offsets of the partitions. This is going to > be > > > > used in > > > > > case we cannot find a suitable offset based on the timestamp, > > i.e., > > > > the > > > > > message meeting the requirement of the timestamp has not been > > > > produced to > > > > > Kafka yet. *In this case, we just use the latest offset*." > > > Therefore, > > > > > using the TimestampOffsetsInitializer will always have an offset > > at > > > > > startup. > > > > > 3. Clarification on coupling SPECIFIC-OFFSET startup with > > > > > SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already > > uses > > > > > "auto.offset.reset" position for partitions that are not hit. > > > > > > > > > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about > is > > > > > whether the offset specified at the beginning includes non-exist > > > > > partitions. The previous design may have SPECIFIC-OFFSET startup > with > > > > > future partition. However, I think since different strategies have > > been > > > > > used for the first discovered partition and the later discovered > > > > partition, > > > > > the specified offset at startup should be the partitions that have > > been > > > > > confirmed to exist, if not an error will be thrown. If partitions > > still > > > > not > > > > > exist, it should be specified in the post-startup > OffsetsInitializers > > > > > (default EARLIEST). > > > > > > > > > > Best > > > > > Hongshun > > > > > > > > > > > > > > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> > > wrote: > > > > > > > > > >> Thanks Gordon and Leonard > > > > >> > > > > >> I'm sorry that there is no specific case from my side, but I > > consider > > > > the > > > > >> issue as follows > > > > >> > > > > >> 1. Users may set an offset later than the current time because > Flink > > > > does > > > > >> not limit it > > > > >> 2. If we use EARLIEST for a newly discovered partition with > > different > > > > >> OFFSETs, which may be different from the previous strategy. I > think > > > it's > > > > >> best to keep the same strategy as before if it does not cause data > > > > losing > > > > >> 3. I think support different OFFSETs in the FLIP will not make the > > > > >> implementation more complexity > > > > >> > > > > >> Of course, if it is confirmed that this is an illegal Timestamp > > OFFSET > > > > and > > > > >> Flink validate it. Then we can use the same strategy to apply to > the > > > > newly > > > > >> discovered partition, I think this will be nice too > > > > >> > > > > >> Best, > > > > >> Shammon FY > > > > >> > > > > >> > > > > >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com> > > > wrote: > > > > >> > > > > >> > Thanks Hongshun and Shammon for driving the FLIP! > > > > >> > > > > > >> > > > > > >> > > *2. Clarification on "future-time" TIMESTAMP > OffsetsInitializer* > > > > >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with > > > > >> > SPECIFIC-OFFSET > > > > >> > > post-startup* > > > > >> > > > > > >> > Grodan raised a good point about the future TIMESTAMP and > > > > >> SPECIFIC-OFFSET, > > > > >> > the timestamps/offset of the newly added partition is > undetermined > > > > when > > > > >> the > > > > >> > job starts (the partition has not been created yet), and it is > the > > > > >> > timestamps/offset in the future. > > > > >> > > > > > >> > I used many message queue systems like Kafka, Pulsar, xxMQ. In > my > > > > past > > > > >> > experience, TIMESTAMP and SPECIFIC-OFFSET startup modes are > > usually > > > > >> used > > > > >> > to specify existing timestamps/offset, which are used for > business > > > > >> > scenarios such as backfilling data and re-refreshing data. At > > > present, > > > > >> It's > > > > >> > hard to imagine a user scenario specifying a future timestamp to > > > > filter > > > > >> > data in the current topic of message queue system. Is it > > > overthinking > > > > to > > > > >> > consider future future TIMESTAMP and SPECIFIC-OFFSET? > > > > >> > > > > > >> > > > > > >> > Best, > > > > >> > Leonard > > > > >> > > > > > > > > > > > > > > >