Hi Gordon,

I’ve created a PR [1] with my proposed code changes. Let me know if
anything is missing.

I think I signed a CLA many years ago, so that should be ok as well.

[1] https://github.com/apache/flink/pull/5337

HTH,

-Phil

On Mon, Jan 22, 2018 at 7:17 PM, Philip Luppens <philip.lupp...@gmail.com>
wrote:

> Hi Gordon,
>
> Yeah, I’d need to confirm with our devops guys that this is the case (by
> default, the Kinesis monitoring doesn’t show how many/which shards were
> re-ingested, all I remember is seeing the iterator age shooting up again to
> the retention horizon, but no clue if this was because of 1 shard, or
> more). I do remember we were having issues regardless when there were
> closed shards, but I could be wrong.
>
> [1] https://issues.apache.org/jira/browse/FLINK-8484
>
> I’ve created a ticket [1] to track the issue, and I’ll see if I can
> provide a small patch against the 1.3 branch.
>
> HTH,
>
> -Phil
>
> On Mon, Jan 22, 2018 at 6:26 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Philip,
>>
>> Thanks a lot for reporting this, and looking into this in detail.
>>
>> Your observation sounds accurate to me. The `endingSequenceNumber` would
>> no longer be null once a shard is closed, so on restore that would mistaken
>> the consumer to think that it’s a new shard and start consuming it from the
>> earliest sequence number possible (i.e., treating it as if it is a new
>> shard that was created while the job wasn’t running).
>>
>> I think we haven’t seen other reports on this, yet, because the issue you
>> observed seems to only happen in a corner case where you rescaled the
>> Kinesis stream while the job was down.
>> Could you confirm that assumption? My guess is probably Flink users who
>> uses Kinesis have currently only been rescaling Kinesis streams while the
>> job was running.
>>
>> Your workaround is also a valid fix for this bug. Could you file a JIRA
>> for this? Would be happy to also review a PR for the fix, if you would like
>> to contribute it.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 22 January 2018 at 5:08:36 PM, Philip Luppens (
>> philip.lupp...@gmail.com) wrote:
>>
>> Hi everyone,
>>
>> For the past weeks, we’ve been struggling with Kinesis ingestion using
>> the Flink Kinesis connector, but the seemingly complete lack of similar
>> reports makes us wonder if perhaps we misconfigured or mis-used the
>> connector.
>>
>> We’re using the connector to subscribe to streams varying from 1 to a 100
>> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
>> stream up and down during peak times. What we’ve noticed is that, while we
>> were having closed shards, any Flink job restart with check- or save-point
>> would result in shards being re-read from the event horizon, duplicating
>> our events.
>>
>> We started checking the checkpoint state, and found that the shards were
>> stored correctly with the proper sequence number (including for closed
>> shards), but that upon restarts, the older closed shards would be read from
>> the event horizon, as if their restored state would be ignored.
>>
>> In the end, we believe that we found the problem: in the
>> FlinkKinesisConsumer’s run() method, we’re trying to find the shard
>> returned from the KinesisDataFetcher against the shards’ metadata from the
>> restoration point, but we do this via a containsKey() call, which means
>> we’ll use the StreamShardMetadata’s equals() method. However, this checks
>> for all properties, including the endingSequenceNumber, which might have
>> changed between the restored state’s checkpoint and our data fetch, thus
>> failing the equality check, failing the containsKey() check, and resulting
>> in the shard being re-read from the event horizon, even though it was
>> present in the restored state.
>>
>> We’ve created a workaround where we only check for the shardId and stream
>> name to restore the state of the shards we’ve already seen, and this seems
>> to work correctly. However, as pointed out above, the lack of similar
>> reports makes us worried that we’ve misunderstood something, so we’d
>> appreciate any feedback whether or not our report makes sense before we
>> file a bug in the issue tracker.
>>
>> Much appreciated,
>>
>> -Phil
>>
>> --
>> "We cannot change the cards we are dealt, just how we play the hand." -
>> Randy Pausch
>>
>>
>
>
> --
> "We cannot change the cards we are dealt, just how we play the hand." -
> Randy Pausch
>



-- 
"We cannot change the cards we are dealt, just how we play the hand." -
Randy Pausch

Reply via email to