[ https://issues.apache.org/jira/browse/FLINK-10422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann reassigned FLINK-10422: ------------------------------------- Assignee: eugen yushin > Follow AWS specs in Kinesis Consumer > ------------------------------------- > > Key: FLINK-10422 > URL: https://issues.apache.org/jira/browse/FLINK-10422 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.6.1 > Reporter: eugen yushin > Assignee: eugen yushin > Priority: Major > Labels: pull-request-available > > *Related conversation in mailing list:* > [https://lists.apache.org/thread.html/96de3bac9761564767cf283b58d664f5ae1b076e0c4431620552af5b@%3Cdev.flink.apache.org%3E] > *Summary:* > Flink Kinesis consumer checks shards id for a particular pattern: > {noformat} > "^shardId-\\d{12}" > {noformat} > [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java#L132] > While this inlines with current Kinesis streams server implementation (all > streams follows this pattern), it confronts with AWS docs: > > {code:java} > ShardId > The unique identifier of the shard within the stream. > Type: String > Length Constraints: Minimum length of 1. Maximum length of 128. > Pattern: [a-zA-Z0-9_.-]+ > Required: Yes > {code} > > [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html] > *Intention:* > We have no guarantees and can't rely on patterns other than provided in AWS > manifest. > Any custom implementation of Kinesis mock should rely on AWS manifest which > claims ShardID to be alfanums. This prevents anyone to use Flink with such > kind of mocks. > The reason behind the scene to use particular pattern "^shardId-d12" is to > create Flink's custom Shard comparator, filter already seen shards, and pass > latest shard for client.listShards only to limit the scope for RPC call to > AWS. > In the meantime, I think we can get rid of this logic at all. The current > usage in project is: > - fix Kinesalite bug (I've already opened an issue to cover this: > [https://github.com/mhart/kinesalite/issues/76] and opened PR: > [https://github.com/mhart/kinesalite/pull/77]). We can move this logic to > test code base to keep production code clean for now > > [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L464] > - adjust last seen shard id. We can simply omit this cause' AWS client won't > return already seen shards and we will have new ids only or nothing. > [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L475] > > [https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L406] -- This message was sent by Atlassian JIRA (v7.6.3#76005)