Hi Arvid, Got it, we don't use Avro.schema inside of DebeziumAvroRegistryDeserializationSchema, but I tried to test it with a unit test and `org.apache.commons.lang3.SerializationUtils.clone` runs successfully.
I'm curious as to why things work (are serializable) when we use dynamic classloading, but do not work when we put this in lib/ and bypass the dynamic loading--any ideas there? On Thu, Aug 26, 2021 at 11:05 AM Arvid Heise <ar...@apache.org> wrote: > Hi Kevin, > > the consumer needs to be serializable. Apparently you are also serializing > the Avro schema (probably as part of your > DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to > copy our SerializableAvroSchema [1] > > Make sure that everything is serializable. You can check that in a unit > test by using org.apache.commons.lang3.SerializationUtils.clone. > > [1] > https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34 > > On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam <kevin....@shopify.com> wrote: > >> Hi! >> >> We're using 1.13.1. We have a class in our user code that extends >> FlinkKafkaConsumer, that's built for reading avro records from Kafka. >> However it doesn't hold any Schema objects as fields so I'm a little >> confused. >> >> Something like this: >> >> ``` >> class OurSource[T <: ClassTag: TypeInformation: Decoder]( >> val name: String, >> val topic: String, >> val clusterConfig: KafkaClusterConfig, >> sslConfig: KafkaSSLConfig, >> offsetReset: Option[OffsetReset.Value] = None, >> offsetProvider: Option[OffsetProvider] = None >> ) extends FlinkKafkaConsumer[Option[T]]( >> topic, >> new DebeziumAvroRegistryDeserializationSchema[T]( >> clusterConfig, >> KafkaConfiguration.sslProperties(sslConfig) >> ), >> CollectionUtil.mapToProperties( >> KafkaConfiguration.consumerProperties(clusterConfig, name, >> isolationLevel = "read_committed") ++ >> KafkaConfiguration.sslProperties(sslConfig) >> ) >> ) >> ``` >> >> I understand how classloading may change, but why would that change >> whether we hit this serialization issue or not? >> >> On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com> >> wrote: >> >>> Hi! >>> >>> What Flink version are you using? In current Flink code base >>> FlinkKafkaConsumer does not contain fields related to Avro. >>> >>> Jars in usrlib has a higher priority to be loaded than jars in lib. So >>> if there is another FlinkKafkaConsumer class in your user jar then it might >>> affect class loading and thus affect this issue. >>> >>> Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道: >>> >>>> Hi all, >>>> >>>> I'm trying to avoid dynamic class loading my user code [0] due to a >>>> suspected classloading leak, but when I put my application jar into /lib >>>> instead of /usrlib, I run into the following error: >>>> >>>> ``` >>>> The main method caused an error: The implementation of the >>>> FlinkKafkaConsumer is not serializable. The object probably contains or >>>> references non serializable fields. >>>> ``` >>>> >>>> which specifically seems to be caused by >>>> ``` >>>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema >>>> ``` >>>> >>>> What's curious to me about this is the error does not occur when we use >>>> dynamic classloading and put our application jar into /usrlib. >>>> >>>> Any ideas what's going on? It would seem to us that the method of >>>> loading the classes shouldn't impact whether or not something is >>>> serialized. >>>> >>>> Appreciate any help, thanks! >>>> >>>> [0] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code >>>> >>>