+1 on separating out the logic relevant to Kinesalite. Kinesalite is likely used in testing environment a lot.
On Thu, Sep 13, 2018 at 1:52 AM, Евгений Юшин <evgenij.us...@gmail.com> wrote: > Hi there > > Flink Kinesis consumer checks shards id for a particular pattern: > "^shardId-\\d{12}" > > https://github.com/apache/flink/blob/master/flink- > connectors/flink-connector-kinesis/src/main/java/org/ > apache/flink/streaming/connectors/kinesis/model/ > StreamShardHandle.java#L132 > > While this inlines with current Kinesis streams server implementation (all > streams follows this pattern), it confronts with AWS docs: > *ShardId* > The unique identifier of the shard within the stream. > Type: String > Length Constraints: Minimum length of 1. Maximum length of 128. > > *Pattern: [a-zA-Z0-9_.-]+*Required: Yes > > https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html > > *Intention:* > We have no guarantees and can't rely on patterns other than provided in AWS > manifest. > Any custom implementation of Kinesis mock should rely on AWS manifest which > claims ShardID to be alfanums. This prevents anyone to use Flink with such > kind of mocks. > > The reason behind the scene to use particular pattern "^shardId-\\d{12}" is > to create Flink's custom Shard comparator, filter already seen shards, and > pass latest shard for client.listShards only to limit the scope for RPC > call to AWS. > > In the meantime, I think we can get rid of this logic at all. The current > usage in project is: > - fix Kinesalite bug (I've already opened an issue to cover this: > https://github.com/mhart/kinesalite/issues/76). We can move this logic to > test code base to keep production code clean for now > https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c > b1c45775c9/flink-connectors/flink-connector-kinesis/src/ > main/java/org/apache/flink/streaming/connectors/kinesis/ > proxy/KinesisProxy.java#L464 > > - adjust last seen shard id. We can simply omit this cause' AWS client > won't return already seen shards and we will have new ids only or nothing. > https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c > b1c45775c9/flink-connectors/flink-connector-kinesis/src/ > main/java/org/apache/flink/streaming/connectors/kinesis/ > internals/KinesisDataFetcher.java#L475 > https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c > b1c45775c9/flink-connectors/flink-connector-kinesis/src/ > main/java/org/apache/flink/streaming/connectors/kinesis/ > proxy/KinesisProxy.java#L406 > > > What do you think? > > Regards, > Eugen >