Hi Vijay, I'm not entirely sure of the semantics between ThreadPoolSize and MaxConnections since they are all KPL configurations (this specific question would probably be better directed to AWS), but my guess would be that the number of concurrent requests to the KPL backend is capped by MaxConnections. This is per parallel FlinkKinesisProducer subtask.
As for ThreadPoolSize, do note that the default threading model by KPL is PER_REQUEST, for which the KPL native process will launch a thread for each request. Under heavy load, this would of course be an issue. Since you didn't explicitly mention this config, make sure to set this to POOLED to actually make use of a fixed thread pool for requests. Overall, my suggestion is to set a reasonable queue limit for the number of records buffered by KPL's native process (by default it is unbounded). Without that in place, under high load you would easily be resource exhausted, and can cause more unpredictable checkpointing times since the FlinkKinesisProducer would need to flush pending records on checkpoints (which ultimately also applies backpressure upstream). BR, Gordon On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data > stream(KDS). > Getting following errors: > 1. > Throttling > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536) > > 2. ERROR > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader > - [2020-06-18 15:49:24.238655] [0x00000ed6][0x00007fc2086c8700] [error] > [shard_map.cc:150] Shard map update for stream "...._write" failed. Code: > *LimitExceededException > Message: Rate exceeded for stream *..._write under account 753274046439.; > retrying in 1500 ms > > 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28* > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure > > > https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties > > > https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/ > > These are the KPL property changes I am planning to make. > > *RequestTimeput*: 10000 //default 6000 ms > > *AggregationEnabled*: true //default is true > > *ThreadPoolSize*: *15* //default 10 > > *MaxConnections*: *48* //default 24 - this might have been a bottleneck > when we flooded KPL with requests. Requests are sent in parallel over > multiple connections to the backend. > > *RecordTtl*: *10000* //default 30000 ms - drop record after 10s. > > *FailIfThrottled*: *true* //default false - so if throttled, don't retry. > > > We were using parallelism for sinks at 80. So each corresponds to 1 > FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads. > MaxConnections is 24 from KPL. > > I am not sure about the MaxConnections setting - what does 48 mean here > -is it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS > backend via KPL ? > > Any thoughts on how not to overwhelm KPL while handling real time > streaming load to the Kinesis via the FlinkKinesisProducer ? > > TIA, >