I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.

About half my shards start over at trim horizon. Why would some shard
statuses appear to not exist in a savepoints? This seems like a big problem.

-Steve

On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi,
>
> I am also facing the same problem. I am using Flink 1.9.0 and consuming
> from Kinesis source with retention of 1 day. I am observing that when the
> job is submitted with "latest" initial stream position, the job starts well
> and keep on processing data from all the shards for very long period of
> time without any lag. When the job fails then it also recovery well with
> last successful checkpointed state. But i am also experiencing that very
> rarely when the job fails and it recovers from the last successful
> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
> the stream. For me, to reproduce this issue is still unknown to defined a
> step by step process.
>
> So far now, as per the analysis i gathered some  more information by
> customizing the FlinkKinesisConsumer to put additional log message, I
> noticed that the number of shards details which is loaded from checkpoint
> data during recovering is less than than the actual number of shards in the
> stream. I have fixed number of shards in kinesis stream.
>
> i added one line of debug log at line 408 to print the size of variable "
> sequenceNumsToRestore" which was populated with shard details from
> checkpoint data.
>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>
> In this consumer class, when the "run" method is called, it does following
>
>    -  it discover shards from kinesis stream and selects all those shards
>    which a subtask can scheduled
>    - then one by one it iterates over the discovers shards and checks
>    that whether that shards state is available in recovered state
>    "sequenceNumsToRestore"
>    
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>    - if it is available then it scheduled that shard with the recovered
>    state
>    - if it is not available in the state then it shcedule that shard with
>    "EARLIEST_SEQUENCE_NUMBER"
>    
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>
> As in my case, the recovered number of shard details from the checkpoint
> data is less than the actual number of shards which results into scheduling
> those shards with earliest stream position.
> I am suspecting that somehow the checkpoint is missing state for some of
> the shards. But if this is the case then that checkpoint should had failed.
>
> Any further information to resolve this issue would be highly
> appreciated...
>
> Regards,
> Ravi
>
> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote:
>
>> Hi Steven
>>
>> If you restore savepoint/checkpoint successfully, I think this might due
>> to the shard wasn't discovered in the previous run, therefore it would be
>> consumed from the beginning. Please refer to the implementation here: [1]
>>
>> [1]
>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Steven Nelson <snel...@sourceallies.com>
>> *Sent:* Wednesday, October 16, 2019 4:31
>> *To:* user <user@flink.apache.org>
>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>>
>> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>>
>> We have extended data retention on the Kinesis stream, which gives us 7
>> days of data.
>>
>> We have found that when a savepoint/checkpoint is restored that it
>> appears to be restarting the Kinesis Consumer from the start of the stream.
>> The 
>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
>> property reports to Prometheus that it is behind by 7 days when the process
>> starts back up from a savepoint.
>>
>> We have some logs that say:
>>
>> Subtask 3 will start consuming seeded shard 
>> StreamShardHandle{streamName='TheStream',
>> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey:
>> 220651847300296034902031972006537199616,EndingHashKey:
>> 223310303291865866647839586127097888767},SequenceNumberRange:
>> {StartingSequenceNumber:
>> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence
>> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>>
>> This seems to indicate that this shard is starting from the beginning of
>> the stream
>>
>> and some logs that say:
>> Subtask 3 will start consuming seeded shard StreamShardHandle
>> {streamName=' TheStream ', shard='{ShardId:
>> shardId-000000000087,HashKeyRange: {StartingHashKey:
>> 231285671266575361885262428488779956224,EndingHashKey:
>> 233944127258145193631070042609340645375},SequenceNumberRange:
>> {StartingSequenceNumber:
>> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence
>> number 49599841594208637293623823226010128300928335129272649074 with
>> ShardConsumer 21
>>
>> This shard seems to be resuming from a specific point.
>>
>> I am assuming that this might be caused by no data being available on the
>> shard for the entire stream (possible with this application stage). Is this
>> the expected behavior? I had thought it would checkpoint with the most
>> recent sequence number, regardless of if it got data or not.
>>
>> -Steve
>>
>>
>>

Reply via email to