Hi Gordon, I’ve created a PR [1] with my proposed code changes. Let me know if anything is missing.
I think I signed a CLA many years ago, so that should be ok as well. [1] https://github.com/apache/flink/pull/5337 HTH, -Phil On Mon, Jan 22, 2018 at 7:17 PM, Philip Luppens <philip.lupp...@gmail.com> wrote: > Hi Gordon, > > Yeah, I’d need to confirm with our devops guys that this is the case (by > default, the Kinesis monitoring doesn’t show how many/which shards were > re-ingested, all I remember is seeing the iterator age shooting up again to > the retention horizon, but no clue if this was because of 1 shard, or > more). I do remember we were having issues regardless when there were > closed shards, but I could be wrong. > > [1] https://issues.apache.org/jira/browse/FLINK-8484 > > I’ve created a ticket [1] to track the issue, and I’ll see if I can > provide a small patch against the 1.3 branch. > > HTH, > > -Phil > > On Mon, Jan 22, 2018 at 6:26 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Philip, >> >> Thanks a lot for reporting this, and looking into this in detail. >> >> Your observation sounds accurate to me. The `endingSequenceNumber` would >> no longer be null once a shard is closed, so on restore that would mistaken >> the consumer to think that it’s a new shard and start consuming it from the >> earliest sequence number possible (i.e., treating it as if it is a new >> shard that was created while the job wasn’t running). >> >> I think we haven’t seen other reports on this, yet, because the issue you >> observed seems to only happen in a corner case where you rescaled the >> Kinesis stream while the job was down. >> Could you confirm that assumption? My guess is probably Flink users who >> uses Kinesis have currently only been rescaling Kinesis streams while the >> job was running. >> >> Your workaround is also a valid fix for this bug. Could you file a JIRA >> for this? Would be happy to also review a PR for the fix, if you would like >> to contribute it. >> >> Cheers, >> Gordon >> >> >> On 22 January 2018 at 5:08:36 PM, Philip Luppens ( >> philip.lupp...@gmail.com) wrote: >> >> Hi everyone, >> >> For the past weeks, we’ve been struggling with Kinesis ingestion using >> the Flink Kinesis connector, but the seemingly complete lack of similar >> reports makes us wonder if perhaps we misconfigured or mis-used the >> connector. >> >> We’re using the connector to subscribe to streams varying from 1 to a 100 >> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis >> stream up and down during peak times. What we’ve noticed is that, while we >> were having closed shards, any Flink job restart with check- or save-point >> would result in shards being re-read from the event horizon, duplicating >> our events. >> >> We started checking the checkpoint state, and found that the shards were >> stored correctly with the proper sequence number (including for closed >> shards), but that upon restarts, the older closed shards would be read from >> the event horizon, as if their restored state would be ignored. >> >> In the end, we believe that we found the problem: in the >> FlinkKinesisConsumer’s run() method, we’re trying to find the shard >> returned from the KinesisDataFetcher against the shards’ metadata from the >> restoration point, but we do this via a containsKey() call, which means >> we’ll use the StreamShardMetadata’s equals() method. However, this checks >> for all properties, including the endingSequenceNumber, which might have >> changed between the restored state’s checkpoint and our data fetch, thus >> failing the equality check, failing the containsKey() check, and resulting >> in the shard being re-read from the event horizon, even though it was >> present in the restored state. >> >> We’ve created a workaround where we only check for the shardId and stream >> name to restore the state of the shards we’ve already seen, and this seems >> to work correctly. However, as pointed out above, the lack of similar >> reports makes us worried that we’ve misunderstood something, so we’d >> appreciate any feedback whether or not our report makes sense before we >> file a bug in the issue tracker. >> >> Much appreciated, >> >> -Phil >> >> -- >> "We cannot change the cards we are dealt, just how we play the hand." - >> Randy Pausch >> >> > > > -- > "We cannot change the cards we are dealt, just how we play the hand." - > Randy Pausch > -- "We cannot change the cards we are dealt, just how we play the hand." - Randy Pausch