I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2. About half my shards start over at trim horizon. Why would some shard statuses appear to not exist in a savepoints? This seems like a big problem.
-Steve On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi, > > I am also facing the same problem. I am using Flink 1.9.0 and consuming > from Kinesis source with retention of 1 day. I am observing that when the > job is submitted with "latest" initial stream position, the job starts well > and keep on processing data from all the shards for very long period of > time without any lag. When the job fails then it also recovery well with > last successful checkpointed state. But i am also experiencing that very > rarely when the job fails and it recovers from the last successful > checkpointed state, i noticed a hug lag( 1 day as per retention) on one of > the stream. For me, to reproduce this issue is still unknown to defined a > step by step process. > > So far now, as per the analysis i gathered some more information by > customizing the FlinkKinesisConsumer to put additional log message, I > noticed that the number of shards details which is loaded from checkpoint > data during recovering is less than than the actual number of shards in the > stream. I have fixed number of shards in kinesis stream. > > i added one line of debug log at line 408 to print the size of variable " > sequenceNumsToRestore" which was populated with shard details from > checkpoint data. > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408 > > In this consumer class, when the "run" method is called, it does following > > - it discover shards from kinesis stream and selects all those shards > which a subtask can scheduled > - then one by one it iterates over the discovers shards and checks > that whether that shards state is available in recovered state > "sequenceNumsToRestore" > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295 > - if it is available then it scheduled that shard with the recovered > state > - if it is not available in the state then it shcedule that shard with > "EARLIEST_SEQUENCE_NUMBER" > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308 > > As in my case, the recovered number of shard details from the checkpoint > data is less than the actual number of shards which results into scheduling > those shards with earliest stream position. > I am suspecting that somehow the checkpoint is missing state for some of > the shards. But if this is the case then that checkpoint should had failed. > > Any further information to resolve this issue would be highly > appreciated... > > Regards, > Ravi > > On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote: > >> Hi Steven >> >> If you restore savepoint/checkpoint successfully, I think this might due >> to the shard wasn't discovered in the previous run, therefore it would be >> consumed from the beginning. Please refer to the implementation here: [1] >> >> [1] >> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 >> >> Best >> Yun Tang >> ------------------------------ >> *From:* Steven Nelson <snel...@sourceallies.com> >> *Sent:* Wednesday, October 16, 2019 4:31 >> *To:* user <user@flink.apache.org> >> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore. >> >> Hello, we currently use Flink 1.9.0 with Kinesis to process data. >> >> We have extended data retention on the Kinesis stream, which gives us 7 >> days of data. >> >> We have found that when a savepoint/checkpoint is restored that it >> appears to be restarting the Kinesis Consumer from the start of the stream. >> The >> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest >> property reports to Prometheus that it is behind by 7 days when the process >> starts back up from a savepoint. >> >> We have some logs that say: >> >> Subtask 3 will start consuming seeded shard >> StreamShardHandle{streamName='TheStream', >> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey: >> 220651847300296034902031972006537199616,EndingHashKey: >> 223310303291865866647839586127097888767},SequenceNumberRange: >> {StartingSequenceNumber: >> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence >> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20 >> >> This seems to indicate that this shard is starting from the beginning of >> the stream >> >> and some logs that say: >> Subtask 3 will start consuming seeded shard StreamShardHandle >> {streamName=' TheStream ', shard='{ShardId: >> shardId-000000000087,HashKeyRange: {StartingHashKey: >> 231285671266575361885262428488779956224,EndingHashKey: >> 233944127258145193631070042609340645375},SequenceNumberRange: >> {StartingSequenceNumber: >> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence >> number 49599841594208637293623823226010128300928335129272649074 with >> ShardConsumer 21 >> >> This shard seems to be resuming from a specific point. >> >> I am assuming that this might be caused by no data being available on the >> shard for the entire stream (possible with this application stage). Is this >> the expected behavior? I had thought it would checkpoint with the most >> recent sequence number, regardless of if it got data or not. >> >> -Steve >> >> >>