Hi! We're using 1.13.1. We have a class in our user code that extends FlinkKafkaConsumer, that's built for reading avro records from Kafka. However it doesn't hold any Schema objects as fields so I'm a little confused.
Something like this: ``` class OurSource[T <: ClassTag: TypeInformation: Decoder]( val name: String, val topic: String, val clusterConfig: KafkaClusterConfig, sslConfig: KafkaSSLConfig, offsetReset: Option[OffsetReset.Value] = None, offsetProvider: Option[OffsetProvider] = None ) extends FlinkKafkaConsumer[Option[T]]( topic, new DebeziumAvroRegistryDeserializationSchema[T]( clusterConfig, KafkaConfiguration.sslProperties(sslConfig) ), CollectionUtil.mapToProperties( KafkaConfiguration.consumerProperties(clusterConfig, name, isolationLevel = "read_committed") ++ KafkaConfiguration.sslProperties(sslConfig) ) ) ``` I understand how classloading may change, but why would that change whether we hit this serialization issue or not? On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > What Flink version are you using? In current Flink code base > FlinkKafkaConsumer does not contain fields related to Avro. > > Jars in usrlib has a higher priority to be loaded than jars in lib. So if > there is another FlinkKafkaConsumer class in your user jar then it might > affect class loading and thus affect this issue. > > Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道: > >> Hi all, >> >> I'm trying to avoid dynamic class loading my user code [0] due to a >> suspected classloading leak, but when I put my application jar into /lib >> instead of /usrlib, I run into the following error: >> >> ``` >> The main method caused an error: The implementation of the >> FlinkKafkaConsumer is not serializable. The object probably contains or >> references non serializable fields. >> ``` >> >> which specifically seems to be caused by >> ``` >> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema >> ``` >> >> What's curious to me about this is the error does not occur when we use >> dynamic classloading and put our application jar into /usrlib. >> >> Any ideas what's going on? It would seem to us that the method of loading >> the classes shouldn't impact whether or not something is serialized. >> >> Appreciate any help, thanks! >> >> [0] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code >> >