Hi Steven If you restore savepoint/checkpoint successfully, I think this might due to the shard wasn't discovered in the previous run, therefore it would be consumed from the beginning. Please refer to the implementation here: [1]
[1] https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 Best Yun Tang ________________________________ From: Steven Nelson <snel...@sourceallies.com> Sent: Wednesday, October 16, 2019 4:31 To: user <user@flink.apache.org> Subject: Kinesis Connector and Savepoint/Checkpoint restore. Hello, we currently use Flink 1.9.0 with Kinesis to process data. We have extended data retention on the Kinesis stream, which gives us 7 days of data. We have found that when a savepoint/checkpoint is restored that it appears to be restarting the Kinesis Consumer from the start of the stream. The flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest property reports to Prometheus that it is behind by 7 days when the process starts back up from a savepoint. We have some logs that say: Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='TheStream', shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey: 220651847300296034902031972006537199616,EndingHashKey: 223310303291865866647839586127097888767},SequenceNumberRange: {StartingSequenceNumber: 49597946220601502339755334362523522663986150244033234226,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 20 This seems to indicate that this shard is starting from the beginning of the stream and some logs that say: Subtask 3 will start consuming seeded shard StreamShardHandle{streamName=' TheStream ', shard='{ShardId: shardId-000000000087,HashKeyRange: {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49597946220690705320549456855089665537076743690057155954,}}'} from sequence number 49599841594208637293623823226010128300928335129272649074 with ShardConsumer 21 This shard seems to be resuming from a specific point. I am assuming that this might be caused by no data being available on the shard for the entire stream (possible with this application stage). Is this the expected behavior? I had thought it would checkpoint with the most recent sequence number, regardless of if it got data or not. -Steve