Hi Hongshun Thanks for updating the FLIP, it totally sounds good to me.
I just have one comment: How does sql job set new discovery offsets initializer? I found `DataStream` jobs can set different offsets initializers for new discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do SQL jobs need to support this feature? Best, Shammon FY On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1...@gmail.com> wrote: > Hi everyone, > > I have already modified FLIP-288 to provide a > newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and > KafkaSourceEnumerator. Users can use > KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new > partitions. > > Surely, enabling the partition discovery strategy by default and modifying > the offset strategy for new partitions should be brought to the user's > attention. Therefore, it will be explained in the 1.18 release notes. > > WDYT?CC, Ruan, Shammon, Gordon and Leonard. > > > Best, > > Hongshun > > On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1...@gmail.com> > wrote: > > > Hi everyone, > > Thanks for your participation. > > > > @Gordon, I looked at the several questions you raised: > > > > 1. Should we use the firstDiscovery flag or two separate > > OffsetsInitializers? Actually, I have considered later. If we follow > > my initial idea, we can provide a default earliest OffsetsInitializer > > for a new partition. However, According to @Shammon's suggestion, > different > > startup OffsetsInitializers correspond to different post-startup > > OffsetsInitializers for Flink's built-in offset strategies. > > 2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code > > again, and it seems that neither @Shammon nor I have figured out . > > TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First > > get the current end offsets of the partitions. This is going to be > used in > > case we cannot find a suitable offset based on the timestamp, i.e., > the > > message meeting the requirement of the timestamp has not been > produced to > > Kafka yet. *In this case, we just use the latest offset*." Therefore, > > using the TimestampOffsetsInitializer will always have an offset at > > startup. > > 3. Clarification on coupling SPECIFIC-OFFSET startup with > > SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses > > "auto.offset.reset" position for partitions that are not hit. > > > > @Gordon, @Shammon, @Leonard, the core issue we are concerned about is > > whether the offset specified at the beginning includes non-exist > > partitions. The previous design may have SPECIFIC-OFFSET startup with > > future partition. However, I think since different strategies have been > > used for the first discovered partition and the later discovered > partition, > > the specified offset at startup should be the partitions that have been > > confirmed to exist, if not an error will be thrown. If partitions still > not > > exist, it should be specified in the post-startup OffsetsInitializers > > (default EARLIEST). > > > > Best > > Hongshun > > > > > > On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> wrote: > > > >> Thanks Gordon and Leonard > >> > >> I'm sorry that there is no specific case from my side, but I consider > the > >> issue as follows > >> > >> 1. Users may set an offset later than the current time because Flink > does > >> not limit it > >> 2. If we use EARLIEST for a newly discovered partition with different > >> OFFSETs, which may be different from the previous strategy. I think it's > >> best to keep the same strategy as before if it does not cause data > losing > >> 3. I think support different OFFSETs in the FLIP will not make the > >> implementation more complexity > >> > >> Of course, if it is confirmed that this is an illegal Timestamp OFFSET > and > >> Flink validate it. Then we can use the same strategy to apply to the > newly > >> discovered partition, I think this will be nice too > >> > >> Best, > >> Shammon FY > >> > >> > >> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com> wrote: > >> > >> > Thanks Hongshun and Shammon for driving the FLIP! > >> > > >> > > >> > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer* > >> > > *3. Clarification on coupling SPECIFIC-OFFSET startup with > >> > SPECIFIC-OFFSET > >> > > post-startup* > >> > > >> > Grodan raised a good point about the future TIMESTAMP and > >> SPECIFIC-OFFSET, > >> > the timestamps/offset of the newly added partition is undetermined > when > >> the > >> > job starts (the partition has not been created yet), and it is the > >> > timestamps/offset in the future. > >> > > >> > I used many message queue systems like Kafka, Pulsar, xxMQ. In my > past > >> > experience, TIMESTAMP and SPECIFIC-OFFSET startup modes are usually > >> used > >> > to specify existing timestamps/offset, which are used for business > >> > scenarios such as backfilling data and re-refreshing data. At present, > >> It's > >> > hard to imagine a user scenario specifying a future timestamp to > filter > >> > data in the current topic of message queue system. Is it overthinking > to > >> > consider future future TIMESTAMP and SPECIFIC-OFFSET? > >> > > >> > > >> > Best, > >> > Leonard > >> > > >