Hi all, I think Steve's occurrence of the warning was from the consumer side.
For the Flink Kinesis Consumer, this could most likely occur due to excessive ListShard API calls on the target Kinesis stream. The consumer uses this API to discover shards, at a fixed interval. The problem with the current design is that all subtasks of the consumer would try to discover shards, and therefore during the discovery, it may be possible that AWS's service rate limit is hit. The community is well aware of this shortcoming, and AFAIK, we have some plans to address this for Flink 1.8 / 1.9. @Rafi, as for the producer side, you may want to take a look providing a FlinkKinesisPartitioner. By default, this is a round-robin partitioning of the records, i.e. records received by a subtask of the Kinesis sink can end up in any of the Kinesis shards. Cheers, Gordon On Mon, Nov 12, 2018 at 8:54 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi Steve, > > We've encountered this also. We have way more than enough shards, but were > still getting exceptions. > We think we know what is the reason, we would love for someone to > approve/reject. > > What we suspect is happening is as follows: > > The KPL's RateLimit parameter is tracking the amount of bytes/records > written into a specific shard. > If the parallelism of your Sink is >1 (which is probably the case), > multiple tasks == multiple KPL instances which may be writing to the same > shard. > So for each individual KPL the RateLimit is not breached, but if multiple > parallel tasks are writing to the same shard the RateLimit gets breached > and a ProvisionedThroughputExceededException is being thrown. > > What we've tried: > > - Using a random partition key to spread the load evenly between the > shards. This did not work for us... > - We tried to make records being written to the same shards by the > same KPL instance, so the RateLimit would get enforced. We did a keyBy > before the Sink to ensure same records go to the same task and using the > same keyBy logic as the Kinesis partitionKey. This did not work for us... > > What solved it eventually: > > Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a > queueSize so that we'll get back-pressured in case of high load (without > getting ProvisionedThroughputExceededException exceptions). This > solved the problem and currently is not a bottleneck for us, but can be > soon. So this is not a real solution. > > Can anyone suggest a better solution? Approve/reject our assumption? > > Thanks > Rafi > > > On Sat, Nov 10, 2018, 03:02 shkob1 <shahar.kobrin...@gmail.com wrote: > >> If it's running in parallel aren't you just adding readers which maxes out >> your provisioned throughput? probably doesn't belong in here but rather a >> Kinesis thing, but i suggest increasing your number of shards? >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >