Hi everyone,

I'm running into some exceptions when trying to consume data from kinesis
(no EFO) with  flink-connector-aws-kinesis-streams 4.3.0 with flink 1.18 in
managed flink by AWS. I'm running out of attempts because this stream has
different consumers. The exception in question is:

 Suppressed: software.amazon.awssdk.core.exception.SdkClientException:
> Request attempt 2 failure: Rate exceeded for Shard -
> 999999999/kinesis.stream.1/shardId-000000000246 (Service: Kinesis, Status
> Code: 400, Request ID: cd4baa45-65e0-a6d5-acd5-1f93972f97eb, Extended
> Request ID:
> qIbItqSIhG4SlJsD61gqroOI+1DzPjyNt7poglC44YmkSQSlaH0AF/aJo2OmKGmVOft5+K9xp2Gm5rQ9G5qDoeJWkZRax0nEQdpNPP8G6+U=)
> Suppressed: software.amazon.awssdk.core.exception.SdkClientException:
> Request attempt 3 failure: Rate exceeded for Shard -
> 999999999/kinesis.stream.1/shardId-000000000246 (Service: Kinesis, Status
> Code: 400, Request ID: e2a0f9c4-ca02-e8cc-833e-4c13730688ea, Extended
> Request ID:
> wgwLtyW04V71dsr0lTHMAdHf91UwAuIgpJVOHjjPC31Dzhm33iP03cIl79euVf+uMdc+hRX+LP8/F/FcLBGJ4Prl4ZhBM4c42ZfmY84dD2s=)



I've been trying to modify the parameters of the consumer with no luck, so
far these are the settings I tried.

consumerConfig.put("flink.kinesis.max.retry.attempts", 50);
> consumerConfig.put("aws.max-retries", "10");
> consumerConfig.put("aws.kinesis.client.retry.max-attempts", "15");
> consumerConfig.put("aws.kinesis.retry.maxAttempts", 20);
> consumerConfig.put("aws.maxAttempts", "25");
>  consumerConfig.put("aws.max-attempts", "30");

consumerConfig.put("flink.kinesis.shard.getrecords.maxretries", "30");


And then creating the client like:

      Configuration sourceConfig = new Configuration();
>             if (configProps != null) {
>                 for (String key : configProps.stringPropertyNames()) {
>                     String value = configProps.getProperty(key);
>                     sourceConfig.setString(key, value);
>                 }
>             }
>                 source = KinesisStreamsSource.<T>builder()
>                         .setStreamArn(streamArn)
>                         .setDeserializationSchema(deserializer)
>                         .setSourceConfig(sourceConfig)
>                         .build();


However, none of those settings has the desired effect of increasing the
number of attempts to get records. Do you happen to know what's the right
parameter for increasing it? I've been checking docs but for some reason
None of them are working for me.

Thanks in advance,
Cristian Rojas

Reply via email to