> I have already modified FLIP-288 to provide a newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and KafkaSourceEnumerator. Users can use KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for new partitions.
Thanks for addressing my comment Hongshun. > Considering these reasons and facts, I’m +1 to only use EARLIEST for new discovered partitions. Sounds good to me. Overall, +1 to this proposal in principle (I'll formally vote on the vote thread as well). Thanks, Gordon On Tue, Apr 18, 2023 at 9:12 PM Leonard Xu <xbjt...@gmail.com> wrote: > Thanks Hongshun for deeper analysis of the existing KafkaSource > implementation details, Cool! > There’s no specific use case to use a future TIMESTAMP and SPECIFIC-OFFSET > for new discovered partitions > The existing SpecifiedOffsetsInitializer will use the EARLIEST offset for > unspecified partitions as well as new discovered partitions > The existing TimestampOffsetsInitializer will use the LATEST offset for > future timestamp, the LATEST offset is similar to EARLIEST offset for new > discovered partitions in this case, and EARLIEST is safer as it covers > all records. > Considering these reasons and facts, I’m +1 to only use EARLIEST for new > discovered partitions. > > The updated FLIP looks good to me, we can start a vote thread soon if > there are no new divergences. > > Best, > Leonard > > > On Apr 18, 2023, at 4:58 PM, Hongshun Wang <loserwang1...@gmail.com> > wrote: > > > > Hi Shammon, > > > > Thank you for your advice.I have carefully considered whether to show > this > > in SQL DDL. Therefore, I carefully studied whether it is feasible > Recently > > > > However, after reading the corresponding code more thoroughly, it > appears > > that SpecifiedOffsetsInitializer and TimestampOffsetsInitializer do not > > work as we initially thought. Finally, I have decided to only use > > "EARLIEST" instead of allowing the user to make a free choice. > > > > Now, let me show my new understanding. > > > > The actual work of SpecifiedOffsetsInitializer and > > TimestampOffsetsInitializer: > > > > > > - *SpecifiedOffsetsInitializer*: Use *Specified offset* for specified > > partitions while use *EARLIEST* for unspecified partitions. Specified > > partitions offset should be less than the latest offset, otherwise it > will > > start from the *EARLIEST*. > > - *TimestampOffsetsInitializer*: Initialize the offsets based on a > > timestamp. If the message meeting the requirement of the timestamp > have not > > been produced to Kafka yet, just use the *LATEST* offset. > > > > So, some problems will occur when new partition use > > SpecifiedOffsetsInitializer or TimestampOffsetsInitializer. You can find > > more information in the "Rejected Alternatives" section of Flip-288, > which > > includes details of the code and process of deductive reasoning. > > All these problems can be reproducible in the current version. The reason > > why they haven't been exposed is probably because users usually set the > > existing specified offset or timestamp, so it appears as earliest in > > production. > > > > WDYT? > > CC:Ruan, Shammon, Gordon, Leonard and Qingsheng. > > > > Yours > > > > Hongshun > > > > > > > > > > On Fri, Apr 14, 2023 at 5:48 PM Shammon FY <zjur...@gmail.com> wrote: > > > >> Hi Hongshun > >> > >> Thanks for updating the FLIP, it totally sounds good to me. > >> > >> I just have one comment: How does sql job set new discovery offsets > >> initializer? > >> I found `DataStream` jobs can set different offsets initializers for new > >> discovery partitions in `KafkaSourceBuilder.setNewDiscoveryOffsets`. Do > SQL > >> jobs need to support this feature? > >> > >> Best, > >> Shammon FY > >> > >> On Wed, Apr 12, 2023 at 2:27 PM Hongshun Wang <loserwang1...@gmail.com> > >> wrote: > >> > >>> Hi everyone, > >>> > >>> I have already modified FLIP-288 to provide a > >>> newDiscoveryOffsetsInitializer in the KafkaSourceBuilder and > >>> KafkaSourceEnumerator. Users can use > >>> KafkaSourceBuilder#setNewDiscoveryOffsets to change the strategy for > new > >>> partitions. > >>> > >>> Surely, enabling the partition discovery strategy by default and > >> modifying > >>> the offset strategy for new partitions should be brought to the user's > >>> attention. Therefore, it will be explained in the 1.18 release notes. > >>> > >>> WDYT?CC, Ruan, Shammon, Gordon and Leonard. > >>> > >>> > >>> Best, > >>> > >>> Hongshun > >>> > >>> On Fri, Mar 31, 2023 at 2:56 PM Hongshun Wang <loserwang1...@gmail.com > > > >>> wrote: > >>> > >>>> Hi everyone, > >>>> Thanks for your participation. > >>>> > >>>> @Gordon, I looked at the several questions you raised: > >>>> > >>>> 1. Should we use the firstDiscovery flag or two separate > >>>> OffsetsInitializers? Actually, I have considered later. If we follow > >>>> my initial idea, we can provide a default earliest > >> OffsetsInitializer > >>>> for a new partition. However, According to @Shammon's suggestion, > >>> different > >>>> startup OffsetsInitializers correspond to different post-startup > >>>> OffsetsInitializers for Flink's built-in offset strategies. > >>>> 2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code > >>>> again, and it seems that neither @Shammon nor I have figured out . > >>>> TimestampOffsetsInitializer#getPartitionOffsets has a comment: > >> "First > >>>> get the current end offsets of the partitions. This is going to be > >>> used in > >>>> case we cannot find a suitable offset based on the timestamp, i.e., > >>> the > >>>> message meeting the requirement of the timestamp has not been > >>> produced to > >>>> Kafka yet. *In this case, we just use the latest offset*." > >> Therefore, > >>>> using the TimestampOffsetsInitializer will always have an offset at > >>>> startup. > >>>> 3. Clarification on coupling SPECIFIC-OFFSET startup with > >>>> SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses > >>>> "auto.offset.reset" position for partitions that are not hit. > >>>> > >>>> @Gordon, @Shammon, @Leonard, the core issue we are concerned about is > >>>> whether the offset specified at the beginning includes non-exist > >>>> partitions. The previous design may have SPECIFIC-OFFSET startup with > >>>> future partition. However, I think since different strategies have > been > >>>> used for the first discovered partition and the later discovered > >>> partition, > >>>> the specified offset at startup should be the partitions that have > been > >>>> confirmed to exist, if not an error will be thrown. If partitions > still > >>> not > >>>> exist, it should be specified in the post-startup OffsetsInitializers > >>>> (default EARLIEST). > >>>> > >>>> Best > >>>> Hongshun > >>>> > >>>> > >>>> On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> wrote: > >>>> > >>>>> Thanks Gordon and Leonard > >>>>> > >>>>> I'm sorry that there is no specific case from my side, but I consider > >>> the > >>>>> issue as follows > >>>>> > >>>>> 1. Users may set an offset later than the current time because Flink > >>> does > >>>>> not limit it > >>>>> 2. If we use EARLIEST for a newly discovered partition with different > >>>>> OFFSETs, which may be different from the previous strategy. I think > >> it's > >>>>> best to keep the same strategy as before if it does not cause data > >>> losing > >>>>> 3. I think support different OFFSETs in the FLIP will not make the > >>>>> implementation more complexity > >>>>> > >>>>> Of course, if it is confirmed that this is an illegal Timestamp > OFFSET > >>> and > >>>>> Flink validate it. Then we can use the same strategy to apply to the > >>> newly > >>>>> discovered partition, I think this will be nice too > >>>>> > >>>>> Best, > >>>>> Shammon FY > >>>>> > >>>>> > >>>>> On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com> > >> wrote: > >>>>> > >>>>>> Thanks Hongshun and Shammon for driving the FLIP! > >>>>>> > >>>>>> > >>>>>>> *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer* > >>>>>>> *3. Clarification on coupling SPECIFIC-OFFSET startup with > >>>>>> SPECIFIC-OFFSET > >>>>>>> post-startup* > >>>>>> > >>>>>> Grodan raised a good point about the future TIMESTAMP and > >>>>> SPECIFIC-OFFSET, > >>>>>> the timestamps/offset of the newly added partition is undetermined > >>> when > >>>>> the > >>>>>> job starts (the partition has not been created yet), and it is the > >>>>>> timestamps/offset in the future. > >>>>>> > >>>>>> I used many message queue systems like Kafka, Pulsar, xxMQ. In my > >>> past > >>>>>> experience, TIMESTAMP and SPECIFIC-OFFSET startup modes are usually > >>>>> used > >>>>>> to specify existing timestamps/offset, which are used for business > >>>>>> scenarios such as backfilling data and re-refreshing data. At > >> present, > >>>>> It's > >>>>>> hard to imagine a user scenario specifying a future timestamp to > >>> filter > >>>>>> data in the current topic of message queue system. Is it > >> overthinking > >>> to > >>>>>> consider future future TIMESTAMP and SPECIFIC-OFFSET? > >>>>>> > >>>>>> > >>>>>> Best, > >>>>>> Leonard > >>>>> > >>>> > >>> > >> > >