Hi congxian, Thank you for your reply. As I shared details in my previous mail, in my case, last successful checkpoint is missing details for some of the shards. I am not doing any up scale or down scale of kinesis shard. I always run with fix number of shards, so there is no possibility of new shard discovery which could cause such problem.
Thanks, Ravi On Fri 22 Nov, 2019, 02:53 Congxian Qiu, <qcx978132...@gmail.com> wrote: > Hi > > For idle shards, I think restore from the previous not consumed data is > ok, because Flink did not consume any data before, but for not idle shards > this is problematic. From my experience of other connectors, could you > check whether the "error" shards are newly split? maybe the newly split > shards were not contained in the checkpoint. > > Best, > Congxian > > > Steven Nelson <snel...@sourceallies.com> 于2019年10月17日周四 上午2:19写道: > >> In my situation I believe it's because we have idle shards (it's a >> testing environment). I dug into the connector code and it looks like it >> only updates the shard state when a record is processed or when the shard >> hits shard_end. So, for an idle shard it would never get a checkpointed >> state. I guess this is okay since in production we won't have idle shards, >> but it might be better to send through a empty record that doesn't get >> emitted, but it does trigger a state update. >> >> -Steve >> >> >> On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar < >> ravibhushanratna...@gmail.com> wrote: >> >>> Do you know step by step process to reproduce this problem? >>> >>> -Ravi >>> >>> >>> On Wed 16 Oct, 2019, 17:40 Steven Nelson, <snel...@sourceallies.com> >>> wrote: >>> >>>> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2. >>>> >>>> About half my shards start over at trim horizon. Why would some shard >>>> statuses appear to not exist in a savepoints? This seems like a big >>>> problem. >>>> >>>> -Steve >>>> >>>> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar < >>>> ravibhushanratna...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am also facing the same problem. I am using Flink 1.9.0 and >>>>> consuming from Kinesis source with retention of 1 day. I am observing that >>>>> when the job is submitted with "latest" initial stream position, the job >>>>> starts well and keep on processing data from all the shards for very long >>>>> period of time without any lag. When the job fails then it also recovery >>>>> well with last successful checkpointed state. But i am also experiencing >>>>> that very rarely when the job fails and it recovers from the last >>>>> successful checkpointed state, i noticed a hug lag( 1 day as per >>>>> retention) >>>>> on one of the stream. For me, to reproduce this issue is still unknown to >>>>> defined a step by step process. >>>>> >>>>> So far now, as per the analysis i gathered some more information by >>>>> customizing the FlinkKinesisConsumer to put additional log message, I >>>>> noticed that the number of shards details which is loaded from checkpoint >>>>> data during recovering is less than than the actual number of shards in >>>>> the >>>>> stream. I have fixed number of shards in kinesis stream. >>>>> >>>>> i added one line of debug log at line 408 to print the size of >>>>> variable "sequenceNumsToRestore" which was populated with shard >>>>> details from checkpoint data. >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408 >>>>> >>>>> In this consumer class, when the "run" method is called, it does >>>>> following >>>>> >>>>> - it discover shards from kinesis stream and selects all those >>>>> shards which a subtask can scheduled >>>>> - then one by one it iterates over the discovers shards and checks >>>>> that whether that shards state is available in recovered state >>>>> "sequenceNumsToRestore" >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295 >>>>> - if it is available then it scheduled that shard with the >>>>> recovered state >>>>> - if it is not available in the state then it shcedule that shard >>>>> with "EARLIEST_SEQUENCE_NUMBER" >>>>> >>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308 >>>>> >>>>> As in my case, the recovered number of shard details from the >>>>> checkpoint data is less than the actual number of shards which results >>>>> into >>>>> scheduling those shards with earliest stream position. >>>>> I am suspecting that somehow the checkpoint is missing state for some >>>>> of the shards. But if this is the case then that checkpoint should had >>>>> failed. >>>>> >>>>> Any further information to resolve this issue would be highly >>>>> appreciated... >>>>> >>>>> Regards, >>>>> Ravi >>>>> >>>>> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote: >>>>> >>>>>> Hi Steven >>>>>> >>>>>> If you restore savepoint/checkpoint successfully, I think this might >>>>>> due to the shard wasn't discovered in the previous run, therefore it >>>>>> would >>>>>> be consumed from the beginning. Please refer to the implementation here: >>>>>> [1] >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307 >>>>>> >>>>>> Best >>>>>> Yun Tang >>>>>> ------------------------------ >>>>>> *From:* Steven Nelson <snel...@sourceallies.com> >>>>>> *Sent:* Wednesday, October 16, 2019 4:31 >>>>>> *To:* user <user@flink.apache.org> >>>>>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore. >>>>>> >>>>>> Hello, we currently use Flink 1.9.0 with Kinesis to process data. >>>>>> >>>>>> We have extended data retention on the Kinesis stream, which gives us >>>>>> 7 days of data. >>>>>> >>>>>> We have found that when a savepoint/checkpoint is restored that it >>>>>> appears to be restarting the Kinesis Consumer from the start of the >>>>>> stream. >>>>>> The >>>>>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest >>>>>> property reports to Prometheus that it is behind by 7 days when the >>>>>> process >>>>>> starts back up from a savepoint. >>>>>> >>>>>> We have some logs that say: >>>>>> >>>>>> Subtask 3 will start consuming seeded shard >>>>>> StreamShardHandle{streamName='TheStream', >>>>>> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey: >>>>>> 220651847300296034902031972006537199616,EndingHashKey: >>>>>> 223310303291865866647839586127097888767},SequenceNumberRange: >>>>>> {StartingSequenceNumber: >>>>>> 49597946220601502339755334362523522663986150244033234226,}}'} from >>>>>> sequence >>>>>> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20 >>>>>> >>>>>> This seems to indicate that this shard is starting from the beginning >>>>>> of the stream >>>>>> >>>>>> and some logs that say: >>>>>> Subtask 3 will start consuming seeded shard StreamShardHandle >>>>>> {streamName=' TheStream ', shard='{ShardId: >>>>>> shardId-000000000087,HashKeyRange: {StartingHashKey: >>>>>> 231285671266575361885262428488779956224,EndingHashKey: >>>>>> 233944127258145193631070042609340645375},SequenceNumberRange: >>>>>> {StartingSequenceNumber: >>>>>> 49597946220690705320549456855089665537076743690057155954,}}'} from >>>>>> sequence >>>>>> number 49599841594208637293623823226010128300928335129272649074 with >>>>>> ShardConsumer 21 >>>>>> >>>>>> This shard seems to be resuming from a specific point. >>>>>> >>>>>> I am assuming that this might be caused by no data being available on >>>>>> the shard for the entire stream (possible with this application stage). >>>>>> Is >>>>>> this the expected behavior? I had thought it would checkpoint with the >>>>>> most >>>>>> recent sequence number, regardless of if it got data or not. >>>>>> >>>>>> -Steve >>>>>> >>>>>> >>>>>>