The exception is very clear that the SourceFunction should be serializable. Password is not serializable. You can try to set the kafka consumer properties such as this:
props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required subject=\"test\" secret=\"test\";"); The String value will be parsed to Password object.(refer to the method org.apache.kafka.common.config.ConfigDef.parseType) Regards, Dian > 在 2018年12月25日,下午11:04,tao xiao <xiaotao...@gmail.com> 写道: > > Hi team, > > I am passing a security enabled kafka consumer properties to > FlinkKafkaConsumer but keep getting this error > java.io.NotSerializableException? what is the best way to handle this? > > I use Flink 1.7.1 and here is the consumer property that produces the > exception > > props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required > subject=\"test\" secret=\"test\";")); > > stacktrace > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The implementation of > the FlinkKafkaConsumerBase is not serializable. The object probably contains > or references non serializable fields. > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397) > at > org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69) > Caused by: java.io.NotSerializableException: > org.apache.kafka.common.config.types.Password > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at java.util.Hashtable.writeObject(Hashtable.java:1157) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 5 more >
smime.p7s
Description: S/MIME cryptographic signature