Hi,
I was experimenting with different starting offset strategies for my Flink
job, especially in cases where jobs are canceled and scheduled again
and I would like to start with the last committed offset and if the same is
not available then start from the latest.

So I decided to use this:

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))


Now when I start my job in the job master I get is:

Assigning splits to readers {0=[[Partition: mytopic-1, StartingOffset: -3,
StoppingOffset: -9223372036854775808],
[Partition: mytopic-2, StartingOffset: -3, StoppingOffset: -
9223372036854775808]]}

Looks like here both starting and stopping offsets are negative, I am not
sure if this is correct or not.
However what is happening is that no records are getting read from the
Kafka source.

Can anyone please tell me what is the right starting offset strategy to
follow, where a new job is started from last committed offsets or latest.

Also please note that if I just keep the starting offset strategy as:

.setStartingOffsets(OffsetsInitializer.committedOffsets())

Now say I have cancelled the job and started again at a much later
date, then the committed offset will not longer be available in Kafka
topic,

as the data would have been discarded based on topic retention policy.


Hence just using the committed offsets strategy does not always work.


Thanks
Sachin

Reply via email to