[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125252#comment-16125252 ]
ASF GitHub Bot commented on FLINK-7367: --------------------------------------- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 hmmm... I was deploying our Flink job with this change. The Flink job failed to start, and log reports: ``` The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131) ``` of which the implementation of the RichSinkFunction is FlinkKinesisProducer. The only field I add to FlinkKinesisProducer is KinesisProducerConfiguration. So I made KinesisProducerConfiguration `transient`, and ran the Flink job, the job still fails. Thus I doubted if FlinkKinesisProducer is already not serializable currently. To verify that, I created a test for the current FlinkKinesisProducer in master which doesn't have my PR change. Unit test is: ``` @Test public void testProducerSerializable() { Properties testConfig = new Properties(); testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); KinesisConfigUtil.validateAwsConfiguration(testConfig); FlinkKinesisProducer producer = new FlinkKinesisProducer(new KinesisSerializationSchema() { @Override public ByteBuffer serialize(Object element) { return null; } @Override public String getTargetStream(Object element) { return null; } }, testConfig); ClosureCleaner.ensureSerializable(producer); } ``` And it fails. Thus, I think something in FlinkKinesisProducer might not serializable, and already breaks FlinkKinesisProducer. @tzulitai @aljoscha Any insights on this? > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --------------------------------------------------------------------------------------------------------------- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector > Affects Versions: 1.3.0 > Reporter: Bowen Li > Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)