In my situation I believe it's because we have idle shards (it's a testing environment). I dug into the connector code and it looks like it only updates the shard state when a record is processed or when the shard hits shard_end. So, for an idle shard it would never get a checkpointed state. I guess this is okay since in production we won't have idle shards, but it might be better to send through a empty record that doesn't get emitted, but it does trigger a state update.
-Steve On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Do you know step by step process to reproduce this problem? > > -Ravi > > > On Wed 16 Oct, 2019, 17:40 Steven Nelson, <snel...@sourceallies.com> > wrote: > >> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2. >> >> About half my shards start over at trim horizon. Why would some shard >> statuses appear to not exist in a savepoints? This seems like a big problem. >> >> -Steve >> >> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar < >> ravibhushanratna...@gmail.com> wrote: >> >>> Hi, >>> >>> I am also facing the same problem. I am using Flink 1.9.0 and consuming >>> from Kinesis source with retention of 1 day. I am observing that when the >>> job is submitted with "latest" initial stream position, the job starts well >>> and keep on processing data from all the shards for very long period of >>> time without any lag. When the job fails then it also recovery well with >>> last successful checkpointed state. But i am also experiencing that very >>> rarely when the job fails and it recovers from the last successful >>> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of >>> the stream. For me, to reproduce this issue is still unknown to defined a >>> step by step process. >>> >>> So far now, as per the analysis i gathered some more information by >>> customizing the FlinkKinesisConsumer to put additional log message, I >>> noticed that the number of shards details which is loaded from checkpoint >>> data during recovering is less than than the actual number of shards in the >>> stream. I have fixed number of shards in kinesis stream. >>> >>> i added one line of debug log at line 408 to print the size of variable " >>> sequenceNumsToRestore" which was populated with shard details from >>> checkpoint data. >>> >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408 >>> >>> In this consumer class, when the "run" method is called, it does >>> following >>> >>> - it discover shards from kinesis stream and selects all those >>> shards which a subtask can scheduled >>> - then one by one it iterates over the discovers shards and checks >>> that whether that shards state is available in recovered state >>> "sequenceNumsToRestore" >>> >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295 >>> - if it is available then it scheduled that shard with the recovered >>> state >>> - if it is not available in the state then it shcedule that shard >>> with "EARLIEST_SEQUENCE_NUMBER" >>> >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308 >>> >>> As in my case, the recovered number of shard details from the checkpoint >>> data is less than the actual number of shards which results into scheduling >>> those shards with earliest stream position. >>> I am suspecting that somehow the checkpoint is missing state for some of >>> the shards. But if this is the case then that checkpoint should had failed. >>> >>> Any further information to resolve this issue would be highly >>> appreciated... >>> >>> Regards, >>> Ravi >>> >>> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote: >>> >>>> Hi Steven >>>> >>>> If you restore savepoint/checkpoint successfully, I think this might >>>> due to the shard wasn't discovered in the previous run, therefore it would >>>> be consumed from the beginning. Please refer to the implementation here: >>>> [1] >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 >>>> >>>> Best >>>> Yun Tang >>>> ------------------------------ >>>> *From:* Steven Nelson <snel...@sourceallies.com> >>>> *Sent:* Wednesday, October 16, 2019 4:31 >>>> *To:* user <user@flink.apache.org> >>>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore. >>>> >>>> Hello, we currently use Flink 1.9.0 with Kinesis to process data. >>>> >>>> We have extended data retention on the Kinesis stream, which gives us 7 >>>> days of data. >>>> >>>> We have found that when a savepoint/checkpoint is restored that it >>>> appears to be restarting the Kinesis Consumer from the start of the stream. >>>> The >>>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest >>>> property reports to Prometheus that it is behind by 7 days when the process >>>> starts back up from a savepoint. >>>> >>>> We have some logs that say: >>>> >>>> Subtask 3 will start consuming seeded shard >>>> StreamShardHandle{streamName='TheStream', >>>> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey: >>>> 220651847300296034902031972006537199616,EndingHashKey: >>>> 223310303291865866647839586127097888767},SequenceNumberRange: >>>> {StartingSequenceNumber: >>>> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence >>>> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20 >>>> >>>> This seems to indicate that this shard is starting from the beginning >>>> of the stream >>>> >>>> and some logs that say: >>>> Subtask 3 will start consuming seeded shard StreamShardHandle >>>> {streamName=' TheStream ', shard='{ShardId: >>>> shardId-000000000087,HashKeyRange: {StartingHashKey: >>>> 231285671266575361885262428488779956224,EndingHashKey: >>>> 233944127258145193631070042609340645375},SequenceNumberRange: >>>> {StartingSequenceNumber: >>>> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence >>>> number 49599841594208637293623823226010128300928335129272649074 with >>>> ShardConsumer 21 >>>> >>>> This shard seems to be resuming from a specific point. >>>> >>>> I am assuming that this might be caused by no data being available on >>>> the shard for the entire stream (possible with this application stage). Is >>>> this the expected behavior? I had thought it would checkpoint with the most >>>> recent sequence number, regardless of if it got data or not. >>>> >>>> -Steve >>>> >>>> >>>>