[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121161#comment-16121161
 ] 

ASF GitHub Bot commented on FLINK-7367:
---------------------------------------

Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4473#discussion_r132376112
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
    @@ -165,17 +167,10 @@ public void 
setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
        public void open(Configuration parameters) throws Exception {
                super.open(parameters);
     
    -           KinesisProducerConfiguration producerConfig = new 
KinesisProducerConfiguration();
    -
    -           
producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
                
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
    -           if 
(configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
    -                   
producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
    -                                   
ProducerConfigConstants.COLLECTION_MAX_COUNT, 
producerConfig.getCollectionMaxCount(), LOG));
    -           }
    -           if 
(configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
    -                   
producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
    -                                   
ProducerConfigConstants.AGGREGATION_MAX_COUNT, 
producerConfig.getAggregationMaxCount(), LOG));
    +           // Override KPL default value if it's not specified by user
    +           if 
(!configProps.containsKey(ProducerConfigConstants.RATE_LIMIT)) {
    +                   
producerConfig.setRateLimit(ProducerConfigConstants.DEFAULT_RATE_LIMIT);
    --- End diff --
    
    Yeah, either way. This is not really a validation, but a replacement.


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7367
>                 URL: https://issues.apache.org/jira/browse/FLINK-7367
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.3.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>             Fix For: 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> We need to parameterize FlinkKinesisProducer to pass in the above params, in 
> order to cater to our need



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to