+1 on separating out the logic relevant to Kinesalite. Kinesalite is likely
used in testing environment a lot.


On Thu, Sep 13, 2018 at 1:52 AM, Евгений Юшин <evgenij.us...@gmail.com>
wrote:

> Hi there
>
> Flink Kinesis consumer checks shards id for a particular pattern:
> "^shardId-\\d{12}"
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kinesis/src/main/java/org/
> apache/flink/streaming/connectors/kinesis/model/
> StreamShardHandle.java#L132
>
> While this inlines with current Kinesis streams server implementation (all
> streams follows this pattern), it confronts with AWS docs:
> *ShardId*
> The unique identifier of the shard within the stream.
> Type: String
> Length Constraints: Minimum length of 1. Maximum length of 128.
>
> *Pattern: [a-zA-Z0-9_.-]+*Required: Yes
>
> https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html
>
> *Intention:*
> We have no guarantees and can't rely on patterns other than provided in AWS
> manifest.
> Any custom implementation of Kinesis mock should rely on AWS manifest which
> claims ShardID to be alfanums. This prevents anyone to use Flink with such
> kind of mocks.
>
> The reason behind the scene to use particular pattern "^shardId-\\d{12}" is
> to create Flink's custom Shard comparator, filter already seen shards, and
> pass latest shard for client.listShards only to limit the scope for RPC
> call to AWS.
>
> In the meantime, I think we can get rid of this logic at all. The current
> usage in project is:
> - fix Kinesalite bug (I've already opened an issue to cover this:
> https://github.com/mhart/kinesalite/issues/76). We can move this logic to
> test code base to keep production code clean for now
> https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c
> b1c45775c9/flink-connectors/flink-connector-kinesis/src/
> main/java/org/apache/flink/streaming/connectors/kinesis/
> proxy/KinesisProxy.java#L464
>
> - adjust last seen shard id. We can simply omit this cause' AWS client
> won't return already seen shards and we will have new ids only or nothing.
> https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c
> b1c45775c9/flink-connectors/flink-connector-kinesis/src/
> main/java/org/apache/flink/streaming/connectors/kinesis/
> internals/KinesisDataFetcher.java#L475
> https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c
> b1c45775c9/flink-connectors/flink-connector-kinesis/src/
> main/java/org/apache/flink/streaming/connectors/kinesis/
> proxy/KinesisProxy.java#L406
>
>
> What do you think?
>
> Regards,
> Eugen
>

Reply via email to