Hi everyone, Thanks for your participation. @Gordon, I looked at the several questions you raised:
1. Should we use the firstDiscovery flag or two separate OffsetsInitializers? Actually, I have considered later. If we follow my initial idea, we can provide a default earliest OffsetsInitializer for a new partition. However, According to @Shammon's suggestion, different startup OffsetsInitializers correspond to different post-startup OffsetsInitializers for Flink's built-in offset strategies. 2. "Future-time" TIMESTAMP OffsetsInitializer. I looked at the code again, and it seems that neither @Shammon nor I have figured out . TimestampOffsetsInitializer#getPartitionOffsets has a comment: "First get the current end offsets of the partitions. This is going to be used in case we cannot find a suitable offset based on the timestamp, i.e., the message meeting the requirement of the timestamp has not been produced to Kafka yet. *In this case, we just use the latest offset*." Therefore, using the TimestampOffsetsInitializer will always have an offset at startup. 3. Clarification on coupling SPECIFIC-OFFSET startup with SPECIFIC-OFFSET post-startup. SPECIFIC-OFFSET strategy already uses "auto.offset.reset" position for partitions that are not hit. @Gordon, @Shammon, @Leonard, the core issue we are concerned about is whether the offset specified at the beginning includes non-exist partitions. The previous design may have SPECIFIC-OFFSET startup with future partition. However, I think since different strategies have been used for the first discovered partition and the later discovered partition, the specified offset at startup should be the partitions that have been confirmed to exist, if not an error will be thrown. If partitions still not exist, it should be specified in the post-startup OffsetsInitializers (default EARLIEST). Best Hongshun On Thu, Mar 30, 2023 at 1:43 PM Shammon FY <zjur...@gmail.com> wrote: > Thanks Gordon and Leonard > > I'm sorry that there is no specific case from my side, but I consider the > issue as follows > > 1. Users may set an offset later than the current time because Flink does > not limit it > 2. If we use EARLIEST for a newly discovered partition with different > OFFSETs, which may be different from the previous strategy. I think it's > best to keep the same strategy as before if it does not cause data losing > 3. I think support different OFFSETs in the FLIP will not make the > implementation more complexity > > Of course, if it is confirmed that this is an illegal Timestamp OFFSET and > Flink validate it. Then we can use the same strategy to apply to the newly > discovered partition, I think this will be nice too > > Best, > Shammon FY > > > On Thu, Mar 30, 2023 at 12:29 PM Leonard Xu <xbjt...@gmail.com> wrote: > > > Thanks Hongshun and Shammon for driving the FLIP! > > > > > > > *2. Clarification on "future-time" TIMESTAMP OffsetsInitializer* > > > *3. Clarification on coupling SPECIFIC-OFFSET startup with > > SPECIFIC-OFFSET > > > post-startup* > > > > Grodan raised a good point about the future TIMESTAMP and > SPECIFIC-OFFSET, > > the timestamps/offset of the newly added partition is undetermined when > the > > job starts (the partition has not been created yet), and it is the > > timestamps/offset in the future. > > > > I used many message queue systems like Kafka, Pulsar, xxMQ. In my past > > experience, TIMESTAMP and SPECIFIC-OFFSET startup modes are usually used > > to specify existing timestamps/offset, which are used for business > > scenarios such as backfilling data and re-refreshing data. At present, > It's > > hard to imagine a user scenario specifying a future timestamp to filter > > data in the current topic of message queue system. Is it overthinking to > > consider future future TIMESTAMP and SPECIFIC-OFFSET? > > > > > > Best, > > Leonard >