tzulitai commented on PR #52: URL: https://github.com/apache/flink-connector-kafka/pull/52#issuecomment-1745879187
I thought about this more closely, and while the current fix will work, I'm not so sure it's actually how we want to do things. My main issue with this PR is the following behavioral change: with this PR, we now eagerly query Kafka for the latest offsets on the JM, instead of only lazily realizing the actual offsets on the TM (which was the whole point for the `ReaderHandledOffsetsInitializer` class). So - I'm wondering if the correct fix is to actually, at time of `snapshotState` on the TMs, IFF the `currentOffset` of a snapshotted `KafkaPartitionSplitState` is still the `LATEST_OFFSET` marker (i.e. `-1`), we should replace it with `EARLIEST_OFFSET` marker (i.e. `-2`) on the spot and store that into the snapshots. Semantically we get the same result - at restore time, partitions that were previously empty will be read from the beginning, instead of seeked to end at restore time and potentially skipping records. But we still also retain the nice property of lazily realizing offsets only on TMs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org