ThreadPoolSize is per KPL instance, so yes that is per subtask. As I previously mentioned, the maximum concurrent requests going to KDS would be capped by MaxConnections.
On Thu, Jul 23, 2020 at 6:25 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi Gordon, > Thx for your reply. > FlinkKinesisProducer default is ThreadPool which is what I am using. So, > does that mean only 10 threads are making calls to KDS by default ?? > I see from the number of records coming to the KDS that I need only 1-2 > shards. So, the bottleneck is on the KPL side. > Does this mean I have to set a QueueLimit of 500 as shown in the example > below ?? > From what you said, Total MaxConnections would then be by default: 24 * > number of subtasks = 24 * 80 = 1920 connections to KDS. > KPL ThreadPoolSize would be 10 Threads by default - is this per subtask ? > So, would it be 10 * number of subtasks = 10 * 80 = 800 Threads ?? > > I am trying to reconcile the diff above ? Somewhere I am flooding KPL with > too many requests & it gives the curl 28 error. > > So, calculating Queue Limit: > Based on this, my records size = 1600 bytes. I have 96 shards > Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size > of 100kB per shard should be sufficient.So, Queue size/shard=100KB > Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000 > Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25 > > Acc. to the docs: > > By default, FlinkKinesisProducer does not backpressure. Instead, records > that cannot be sent because of the rate restriction of 1 MB per second per > shard are buffered in an unbounded queue and dropped when their RecordTtl > expires. > > To avoid data loss, you can enable backpressuring by restricting the size > of the internal queue: > > // 200 Bytes per record, 1 shard > kinesis.setQueueLimit(500); > > > On Tue, Jul 21, 2020 at 8:00 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Vijay, >> >> I'm not entirely sure of the semantics between ThreadPoolSize and >> MaxConnections since they are all KPL configurations (this specific >> question would probably be better directed to AWS), >> but my guess would be that the number of concurrent requests to the KPL >> backend is capped by MaxConnections. This is per parallel >> FlinkKinesisProducer subtask. >> >> As for ThreadPoolSize, do note that the default threading model by KPL is >> PER_REQUEST, for which the KPL native process will launch a thread for each >> request. >> Under heavy load, this would of course be an issue. Since you didn't >> explicitly mention this config, make sure to set this to POOLED to actually >> make use of a fixed thread pool for requests. >> >> Overall, my suggestion is to set a reasonable queue limit for the number >> of records buffered by KPL's native process (by default it is unbounded). >> Without that in place, under high load you would easily be resource >> exhausted, and can cause more unpredictable checkpointing times since the >> FlinkKinesisProducer would need to flush pending records on checkpoints >> (which ultimately also applies backpressure upstream). >> >> BR, >> Gordon >> >> On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Hi, >>> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data >>> stream(KDS). >>> Getting following errors: >>> 1. >>> Throttling >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >>> >>> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >>> at >>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536) >>> >>> 2. ERROR >>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader >>> - [2020-06-18 15:49:24.238655] [0x00000ed6][0x00007fc2086c8700] [error] >>> [shard_map.cc:150] Shard map update for stream "...._write" failed. Code: >>> *LimitExceededException >>> Message: Rate exceeded for stream *..._write under account >>> 753274046439.; retrying in 1500 ms >>> >>> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28* >>> >>> >>> >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure >>> >>> >>> https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties >>> >>> >>> https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/ >>> >>> These are the KPL property changes I am planning to make. >>> >>> *RequestTimeput*: 10000 //default 6000 ms >>> >>> *AggregationEnabled*: true //default is true >>> >>> *ThreadPoolSize*: *15* //default 10 >>> >>> *MaxConnections*: *48* //default 24 - this might have been a bottleneck >>> when we flooded KPL with requests. Requests are sent in parallel over >>> multiple connections to the backend. >>> >>> *RecordTtl*: *10000* //default 30000 ms - drop record after 10s. >>> >>> *FailIfThrottled*: *true* //default false - so if throttled, don't >>> retry. >>> >>> >>> We were using parallelism for sinks at 80. So each corresponds to 1 >>> FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads. >>> MaxConnections is 24 from KPL. >>> >>> I am not sure about the MaxConnections setting - what does 48 mean here >>> -is it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS >>> backend via KPL ? >>> >>> Any thoughts on how not to overwhelm KPL while handling real time >>> streaming load to the Kinesis via the FlinkKinesisProducer ? >>> >>> TIA, >>> >>