Hi Kevin, the consumer needs to be serializable. Apparently you are also serializing the Avro schema (probably as part of your DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to copy our SerializableAvroSchema [1]
Make sure that everything is serializable. You can check that in a unit test by using org.apache.commons.lang3.SerializationUtils.clone. [1] https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34 On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam <kevin....@shopify.com> wrote: > Hi! > > We're using 1.13.1. We have a class in our user code that extends > FlinkKafkaConsumer, that's built for reading avro records from Kafka. > However it doesn't hold any Schema objects as fields so I'm a little > confused. > > Something like this: > > ``` > class OurSource[T <: ClassTag: TypeInformation: Decoder]( > val name: String, > val topic: String, > val clusterConfig: KafkaClusterConfig, > sslConfig: KafkaSSLConfig, > offsetReset: Option[OffsetReset.Value] = None, > offsetProvider: Option[OffsetProvider] = None > ) extends FlinkKafkaConsumer[Option[T]]( > topic, > new DebeziumAvroRegistryDeserializationSchema[T]( > clusterConfig, > KafkaConfiguration.sslProperties(sslConfig) > ), > CollectionUtil.mapToProperties( > KafkaConfiguration.consumerProperties(clusterConfig, name, > isolationLevel = "read_committed") ++ > KafkaConfiguration.sslProperties(sslConfig) > ) > ) > ``` > > I understand how classloading may change, but why would that change > whether we hit this serialization issue or not? > > On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> What Flink version are you using? In current Flink code base >> FlinkKafkaConsumer does not contain fields related to Avro. >> >> Jars in usrlib has a higher priority to be loaded than jars in lib. So if >> there is another FlinkKafkaConsumer class in your user jar then it might >> affect class loading and thus affect this issue. >> >> Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道: >> >>> Hi all, >>> >>> I'm trying to avoid dynamic class loading my user code [0] due to a >>> suspected classloading leak, but when I put my application jar into /lib >>> instead of /usrlib, I run into the following error: >>> >>> ``` >>> The main method caused an error: The implementation of the >>> FlinkKafkaConsumer is not serializable. The object probably contains or >>> references non serializable fields. >>> ``` >>> >>> which specifically seems to be caused by >>> ``` >>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema >>> ``` >>> >>> What's curious to me about this is the error does not occur when we use >>> dynamic classloading and put our application jar into /usrlib. >>> >>> Any ideas what's going on? It would seem to us that the method of >>> loading the classes shouldn't impact whether or not something is >>> serialized. >>> >>> Appreciate any help, thanks! >>> >>> [0] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code >>> >>