Thanks,Gordon for your reply.

I do not set a queueLimit and so the default unbounded queueSize is 2147483647.
So, it should just be dropping records being produced from the
80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
do not want backpressure as you said it effectively blocks all upstream
operators.

But from what you are saying, it will apply backpressure when the number of
outstanding records accumulated exceeds the default queue limit of 2147483647
or* does it also do it if it is r**ate-limited* *to 1MB per second per
shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
probable.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of
100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records
that cannot be sent because of the rate restriction of 1 MB per second per
shard are buffered in an unbounded queue and dropped when their RecordTtl
expires.

To avoid data loss, you can enable backpressuring by restricting the size
of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);


On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Vijay,
>
> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
> It does however apply backpressure (therefore effectively blocking all
> upstream operators) when the number of outstanding records accumulated
> exceeds a set limit, configured using the
> FlinkKinesisProducer#setQueueLimit
> method.
>
> For starters, you can maybe check if that was set appropriately.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to