In my situation I believe it's because we have idle shards (it's a testing
environment). I dug into the connector code and it looks like it only
updates the shard state when a record is processed or when the shard hits
shard_end. So, for an idle shard it would never get a checkpointed state. I
guess this is okay since in production we won't have idle shards, but it
might be better to send through a empty record that doesn't get emitted,
but it does trigger a state update.

-Steve


On Wed, Oct 16, 2019 at 12:54 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Do you know step by step process to reproduce this problem?
>
> -Ravi
>
>
> On Wed 16 Oct, 2019, 17:40 Steven Nelson, <snel...@sourceallies.com>
> wrote:
>
>> I have verified this behavior in 1.9.0, 1.8.1 and 1.7.2.
>>
>> About half my shards start over at trim horizon. Why would some shard
>> statuses appear to not exist in a savepoints? This seems like a big problem.
>>
>> -Steve
>>
>> On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am also facing the same problem. I am using Flink 1.9.0 and consuming
>>> from Kinesis source with retention of 1 day. I am observing that when the
>>> job is submitted with "latest" initial stream position, the job starts well
>>> and keep on processing data from all the shards for very long period of
>>> time without any lag. When the job fails then it also recovery well with
>>> last successful checkpointed state. But i am also experiencing that very
>>> rarely when the job fails and it recovers from the last successful
>>> checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
>>> the stream. For me, to reproduce this issue is still unknown to defined a
>>> step by step process.
>>>
>>> So far now, as per the analysis i gathered some  more information by
>>> customizing the FlinkKinesisConsumer to put additional log message, I
>>> noticed that the number of shards details which is loaded from checkpoint
>>> data during recovering is less than than the actual number of shards in the
>>> stream. I have fixed number of shards in kinesis stream.
>>>
>>> i added one line of debug log at line 408 to print the size of variable "
>>> sequenceNumsToRestore" which was populated with shard details from
>>> checkpoint data.
>>>
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408
>>>
>>> In this consumer class, when the "run" method is called, it does
>>> following
>>>
>>>    -  it discover shards from kinesis stream and selects all those
>>>    shards which a subtask can scheduled
>>>    - then one by one it iterates over the discovers shards and checks
>>>    that whether that shards state is available in recovered state
>>>    "sequenceNumsToRestore"
>>>    
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
>>>    - if it is available then it scheduled that shard with the recovered
>>>    state
>>>    - if it is not available in the state then it shcedule that shard
>>>    with "EARLIEST_SEQUENCE_NUMBER"
>>>    
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308
>>>
>>> As in my case, the recovered number of shard details from the checkpoint
>>> data is less than the actual number of shards which results into scheduling
>>> those shards with earliest stream position.
>>> I am suspecting that somehow the checkpoint is missing state for some of
>>> the shards. But if this is the case then that checkpoint should had failed.
>>>
>>> Any further information to resolve this issue would be highly
>>> appreciated...
>>>
>>> Regards,
>>> Ravi
>>>
>>> On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote:
>>>
>>>> Hi Steven
>>>>
>>>> If you restore savepoint/checkpoint successfully, I think this might
>>>> due to the shard wasn't discovered in the previous run, therefore it would
>>>> be consumed from the beginning. Please refer to the implementation here: 
>>>> [1]
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>>>>
>>>> Best
>>>> Yun Tang
>>>> ------------------------------
>>>> *From:* Steven Nelson <snel...@sourceallies.com>
>>>> *Sent:* Wednesday, October 16, 2019 4:31
>>>> *To:* user <user@flink.apache.org>
>>>> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>>>>
>>>> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>>>>
>>>> We have extended data retention on the Kinesis stream, which gives us 7
>>>> days of data.
>>>>
>>>> We have found that when a savepoint/checkpoint is restored that it
>>>> appears to be restarting the Kinesis Consumer from the start of the stream.
>>>> The 
>>>> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
>>>> property reports to Prometheus that it is behind by 7 days when the process
>>>> starts back up from a savepoint.
>>>>
>>>> We have some logs that say:
>>>>
>>>> Subtask 3 will start consuming seeded shard 
>>>> StreamShardHandle{streamName='TheStream',
>>>> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey:
>>>> 220651847300296034902031972006537199616,EndingHashKey:
>>>> 223310303291865866647839586127097888767},SequenceNumberRange:
>>>> {StartingSequenceNumber:
>>>> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence
>>>> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>>>>
>>>> This seems to indicate that this shard is starting from the beginning
>>>> of the stream
>>>>
>>>> and some logs that say:
>>>> Subtask 3 will start consuming seeded shard StreamShardHandle
>>>> {streamName=' TheStream ', shard='{ShardId:
>>>> shardId-000000000087,HashKeyRange: {StartingHashKey:
>>>> 231285671266575361885262428488779956224,EndingHashKey:
>>>> 233944127258145193631070042609340645375},SequenceNumberRange:
>>>> {StartingSequenceNumber:
>>>> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence
>>>> number 49599841594208637293623823226010128300928335129272649074 with
>>>> ShardConsumer 21
>>>>
>>>> This shard seems to be resuming from a specific point.
>>>>
>>>> I am assuming that this might be caused by no data being available on
>>>> the shard for the entire stream (possible with this application stage). Is
>>>> this the expected behavior? I had thought it would checkpoint with the most
>>>> recent sequence number, regardless of if it got data or not.
>>>>
>>>> -Steve
>>>>
>>>>
>>>>

Reply via email to