[ 
https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125252#comment-16125252
 ] 

ASF GitHub Bot commented on FLINK-7367:
---------------------------------------

Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/4473
  
    hmmm... I was deploying our Flink job with this change. The Flink job 
failed to start, and log reports:
    
    ```
    The implementation of the RichSinkFunction is not serializable. The object 
probably contains or references non serializable fields.
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
        
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
        
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
        
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131)
    ```
    
    of which the implementation of the RichSinkFunction is 
FlinkKinesisProducer. The only field I add to FlinkKinesisProducer is 
KinesisProducerConfiguration. So I made KinesisProducerConfiguration 
`transient`, and ran the Flink job, the job still fails. 
    
    Thus I doubted if FlinkKinesisProducer is already not serializable 
currently. To verify that, I created a test for the current 
FlinkKinesisProducer in master which doesn't have my PR change. Unit test is:
    
    ```
    @Test
        public void testProducerSerializable() {
                Properties testConfig = new Properties();
                testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKey");
                
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
                testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
                
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
                KinesisConfigUtil.validateAwsConfiguration(testConfig);
    
                FlinkKinesisProducer producer = new FlinkKinesisProducer(new 
KinesisSerializationSchema() {
                        @Override
                        public ByteBuffer serialize(Object element) {
                                return null;
                        }
    
                        @Override
                        public String getTargetStream(Object element) {
                                return null;
                        }
                }, testConfig);
    
                ClosureCleaner.ensureSerializable(producer);
        }
    ```
    
    And it fails. Thus, I think something in FlinkKinesisProducer might not 
serializable, and already breaks FlinkKinesisProducer. 
    
    @tzulitai @aljoscha  Any insights on this?


> Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, 
> MaxConnections, RequestTimeout, etc)
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7367
>                 URL: https://issues.apache.org/jira/browse/FLINK-7367
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.3.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>             Fix For: 1.4.0, 1.3.3
>
>
> Right now, FlinkKinesisProducer only expose two configs for the underlying 
> KinesisProducer:
> - AGGREGATION_MAX_COUNT
> - COLLECTION_MAX_COUNT
> Well, according to [AWS 
> doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] 
> and [their sample on 
> github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties],
>  developers can set more to make the max use of KinesisProducer, and make it 
> fault-tolerant (e.g. by increasing timeout).
> I select a few more configs that we need when using Flink with Kinesis:
> - MAX_CONNECTIONS
> - RATE_LIMIT
> - RECORD_MAX_BUFFERED_TIME
> - RECORD_TIME_TO_LIVE
> - REQUEST_TIMEOUT
> Flink is using KPL's default values. They make Flink writing too fast to 
> Kinesis, which fail Flink job too frequently. We need to parameterize 
> FlinkKinesisProducer to pass in the above params, in order to slowing down 
> Flink's write rate to Kinesis.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to