Hi Gordon, ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32 nodes. Could it be that the 80 threads get bottlenecked on a common ThreadPool of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run in separate slots/vCPUs and can be spread across 32 nodes in my case but occupying 80 slots/vCPUs. Is my understanding correct and will this be the reason that the KPL gets flooded with too many pending requests at regular intervals ??
TIA, On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Thanks,Gordon for your reply. > > I do not set a queueLimit and so the default unbounded queueSize is > 2147483647. > So, it should just be dropping records being produced from the > 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I > do not want backpressure as you said it effectively blocks all upstream > operators. > > But from what you are saying, it will apply backpressure when the number > of outstanding records accumulated exceeds the default queue limit of > 2147483647 > or* does it also do it if it is r**ate-limited* *to 1MB per second per > shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more > probable. > > So, calculating Queue Limit: > Based on this, my records size = 1600 bytes. I have 96 shards > Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size > of 100kB per shard should be sufficient.So, Queue size/shard=100KB > Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000 > Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25 > > Acc. to the docs: > > By default, FlinkKinesisProducer does not backpressure. Instead, records > that cannot be sent because of the rate restriction of 1 MB per second per > shard are buffered in an unbounded queue and dropped when their RecordTtl > expires. > > To avoid data loss, you can enable backpressuring by restricting the size > of the internal queue: > > // 200 Bytes per record, 1 shard > kinesis.setQueueLimit(500); > > > On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Vijay, >> >> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API. >> It does however apply backpressure (therefore effectively blocking all >> upstream operators) when the number of outstanding records accumulated >> exceeds a set limit, configured using the >> FlinkKinesisProducer#setQueueLimit >> method. >> >> For starters, you can maybe check if that was set appropriately. >> >> Cheers, >> Gordon >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >