Hi, I was experimenting with different starting offset strategies for my Flink job, especially in cases where jobs are canceled and scheduled again and I would like to start with the last committed offset and if the same is not available then start from the latest.
So I decided to use this: .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) Now when I start my job in the job master I get is: Assigning splits to readers {0=[[Partition: mytopic-1, StartingOffset: -3, StoppingOffset: -9223372036854775808], [Partition: mytopic-2, StartingOffset: -3, StoppingOffset: - 9223372036854775808]]} Looks like here both starting and stopping offsets are negative, I am not sure if this is correct or not. However what is happening is that no records are getting read from the Kafka source. Can anyone please tell me what is the right starting offset strategy to follow, where a new job is started from last committed offsets or latest. Also please note that if I just keep the starting offset strategy as: .setStartingOffsets(OffsetsInitializer.committedOffsets()) Now say I have cancelled the job and started again at a much later date, then the committed offset will not longer be available in Kafka topic, as the data would have been discarded based on topic retention policy. Hence just using the committed offsets strategy does not always work. Thanks Sachin