I also tested serializing an instance of `OurSource` with `org.apache.commons.lang3.SerializationUtils.clone` and it worked fine.
On Thu, Aug 26, 2021 at 3:27 PM Kevin Lam <kevin....@shopify.com> wrote: > Hi Arvid, > > Got it, we don't use Avro.schema inside of > DebeziumAvroRegistryDeserializationSchema, but I tried to test it with a > unit test and `org.apache.commons.lang3.SerializationUtils.clone` runs > successfully. > > I'm curious as to why things work (are serializable) when we use dynamic > classloading, but do not work when we put this in lib/ and bypass the > dynamic loading--any ideas there? > > > > On Thu, Aug 26, 2021 at 11:05 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Kevin, >> >> the consumer needs to be serializable. Apparently you are also >> serializing the Avro schema (probably as part of your >> DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to >> copy our SerializableAvroSchema [1] >> >> Make sure that everything is serializable. You can check that in a unit >> test by using org.apache.commons.lang3.SerializationUtils.clone. >> >> [1] >> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34 >> >> On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam <kevin....@shopify.com> wrote: >> >>> Hi! >>> >>> We're using 1.13.1. We have a class in our user code that extends >>> FlinkKafkaConsumer, that's built for reading avro records from Kafka. >>> However it doesn't hold any Schema objects as fields so I'm a little >>> confused. >>> >>> Something like this: >>> >>> ``` >>> class OurSource[T <: ClassTag: TypeInformation: Decoder]( >>> val name: String, >>> val topic: String, >>> val clusterConfig: KafkaClusterConfig, >>> sslConfig: KafkaSSLConfig, >>> offsetReset: Option[OffsetReset.Value] = None, >>> offsetProvider: Option[OffsetProvider] = None >>> ) extends FlinkKafkaConsumer[Option[T]]( >>> topic, >>> new DebeziumAvroRegistryDeserializationSchema[T]( >>> clusterConfig, >>> KafkaConfiguration.sslProperties(sslConfig) >>> ), >>> CollectionUtil.mapToProperties( >>> KafkaConfiguration.consumerProperties(clusterConfig, name, >>> isolationLevel = "read_committed") ++ >>> KafkaConfiguration.sslProperties(sslConfig) >>> ) >>> ) >>> ``` >>> >>> I understand how classloading may change, but why would that change >>> whether we hit this serialization issue or not? >>> >>> On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com> >>> wrote: >>> >>>> Hi! >>>> >>>> What Flink version are you using? In current Flink code base >>>> FlinkKafkaConsumer does not contain fields related to Avro. >>>> >>>> Jars in usrlib has a higher priority to be loaded than jars in lib. So >>>> if there is another FlinkKafkaConsumer class in your user jar then it might >>>> affect class loading and thus affect this issue. >>>> >>>> Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道: >>>> >>>>> Hi all, >>>>> >>>>> I'm trying to avoid dynamic class loading my user code [0] due to a >>>>> suspected classloading leak, but when I put my application jar into /lib >>>>> instead of /usrlib, I run into the following error: >>>>> >>>>> ``` >>>>> The main method caused an error: The implementation of the >>>>> FlinkKafkaConsumer is not serializable. The object probably contains or >>>>> references non serializable fields. >>>>> ``` >>>>> >>>>> which specifically seems to be caused by >>>>> ``` >>>>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema >>>>> ``` >>>>> >>>>> What's curious to me about this is the error does not occur when we >>>>> use dynamic classloading and put our application jar into /usrlib. >>>>> >>>>> Any ideas what's going on? It would seem to us that the method of >>>>> loading the classes shouldn't impact whether or not something is >>>>> serialized. >>>>> >>>>> Appreciate any help, thanks! >>>>> >>>>> [0] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code >>>>> >>>>