[ 
https://issues.apache.org/jira/browse/FLINK-34023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850841#comment-17850841
 ] 

Brad Atcheson commented on FLINK-34023:
---------------------------------------

[Here's](https://github.com/brada/flink-connector-aws/commit/aa4fce309e94f979aa1f3ef48d5d27fa17f55759)
 a possible solution that adds 3 new config properties:
- aws.dynamodb.client.retry-policy.num-retries
- aws.firehose.client.retry-policy.num-retries
- aws.kinesis.client.retry-policy.num-retries

That seemed like the simplest possible approach. Exposing the complete AWS SDK 
retry policy would require many more parameters, including [backoff 
strategy](https://sdk.amazonaws.com/java/api/2.0.0/software/amazon/awssdk/core/retry/backoff/BackoffStrategy.html),
 base delay, max backoff time etc. Since num-retries is the most important 
parameter, would it be acceptable to expose just that one for now?

The `mvn clean verify` and `mvm clean package` commands fail on that branch, 
but they also fail on the origin main branch for what appears to be issues 
unrelated to my commit:
```
[ERROR] Failures:
[ERROR]   RowDataToAttributeValueConverterTest.testFloat:208
Expecting map:
  \{"key"=AttributeValue(N=1.2345679E17)}
to contain entries:
  ["key"=AttributeValue(N=1.23456791E17)]
but the following map entries had different values:
  ["key"=AttributeValue(N=1.2345679E17) (expected: 
AttributeValue(N=1.23456791E17))]
``` 

> Expose Kinesis client retry config in sink
> ------------------------------------------
>
>                 Key: FLINK-34023
>                 URL: https://issues.apache.org/jira/browse/FLINK-34023
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kinesis
>            Reporter: Brad Atcheson
>            Priority: Major
>
> The consumer side exposes client retry configuration like 
> [flink.shard.getrecords.maxretries|https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.html#SHARD_GETRECORDS_RETRIES]
>  but the producer side lacks similar config for PutRecords.
> The KinesisStreamsSinkWriter constructor calls 
> {code}
> this.httpClient = 
> AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
> this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
> {code}
> But those methods only refer to these values (aside from 
> endpoint/region/creds) in the kinesisClientProperties:
> * aws.http-client.max-concurrency
> * aws.http-client.read-timeout
> * aws.trust.all.certificates
> * aws.http.protocol.version
> Without control over retry, users can observe exceptions like {code}Request 
> attempt 2 failure: Unable to execute HTTP request: connection timed out after 
> 2000 ms: kinesis.us-west-2.amazonaws.com{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to