Hi Gordon, Thanks for the reply.
So is it true to say that the KPL RateLimit would not get enforced when the sink parallelism is >1? If multiple subtasks are writing to the same shard and each has their own RateLimit, it is possible that the RateLimit is crossed. If that's the case, can you suggest a way to overcome this? Thanks, Rafi On Tue, Nov 13, 2018 at 6:27 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi all, > > I think Steve's occurrence of the warning was from the consumer side. > > For the Flink Kinesis Consumer, this could most likely occur due to > excessive ListShard API calls on the target Kinesis stream. The consumer > uses this API to discover shards, at a fixed interval. > The problem with the current design is that all subtasks of the consumer > would try to discover shards, and therefore during the discovery, it may be > possible that AWS's service rate limit is hit. > The community is well aware of this shortcoming, and AFAIK, we have some > plans to address this for Flink 1.8 / 1.9. > > @Rafi, as for the producer side, you may want to take a look providing a > FlinkKinesisPartitioner. By default, this is a round-robin partitioning of > the records, i.e. records received by a subtask of the Kinesis sink can end > up in any of the Kinesis shards. > > Cheers, > Gordon > > On Mon, Nov 12, 2018 at 8:54 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: > >> Hi Steve, >> >> We've encountered this also. We have way more than enough shards, but >> were still getting exceptions. >> We think we know what is the reason, we would love for someone to >> approve/reject. >> >> What we suspect is happening is as follows: >> >> The KPL's RateLimit parameter is tracking the amount of bytes/records >> written into a specific shard. >> If the parallelism of your Sink is >1 (which is probably the case), >> multiple tasks == multiple KPL instances which may be writing to the same >> shard. >> So for each individual KPL the RateLimit is not breached, but if multiple >> parallel tasks are writing to the same shard the RateLimit gets breached >> and a ProvisionedThroughputExceededException is being thrown. >> >> What we've tried: >> >> - Using a random partition key to spread the load evenly between the >> shards. This did not work for us... >> - We tried to make records being written to the same shards by the >> same KPL instance, so the RateLimit would get enforced. We did a keyBy >> before the Sink to ensure same records go to the same task and using the >> same keyBy logic as the Kinesis partitionKey. This did not work for us... >> >> What solved it eventually: >> >> Reducing the parallelism of the FlinkKinesisProducer to 1. We also set a >> queueSize so that we'll get back-pressured in case of high load (without >> getting ProvisionedThroughputExceededException exceptions). This >> solved the problem and currently is not a bottleneck for us, but can be >> soon. So this is not a real solution. >> >> Can anyone suggest a better solution? Approve/reject our assumption? >> >> Thanks >> Rafi >> >> >> On Sat, Nov 10, 2018, 03:02 shkob1 <shahar.kobrin...@gmail.com wrote: >> >>> If it's running in parallel aren't you just adding readers which maxes >>> out >>> your provisioned throughput? probably doesn't belong in here but rather a >>> Kinesis thing, but i suggest increasing your number of shards? >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>