Without a real stacktrace, everything is a guess work. So please provide it together with your Flink version.
It might be that some transition of DebeziumAvroRegistryDeserializationSchema (let's say open) will cause an illegal state where it's not serializable. On Thu, Aug 26, 2021 at 9:35 PM Kevin Lam <kevin....@shopify.com> wrote: > I also tested serializing an instance of `OurSource` with > `org.apache.commons.lang3.SerializationUtils.clone` and it worked fine. > > On Thu, Aug 26, 2021 at 3:27 PM Kevin Lam <kevin....@shopify.com> wrote: > >> Hi Arvid, >> >> Got it, we don't use Avro.schema inside of >> DebeziumAvroRegistryDeserializationSchema, but I tried to test it with a >> unit test and `org.apache.commons.lang3.SerializationUtils.clone` runs >> successfully. >> >> I'm curious as to why things work (are serializable) when we use dynamic >> classloading, but do not work when we put this in lib/ and bypass the >> dynamic loading--any ideas there? >> >> >> >> On Thu, Aug 26, 2021 at 11:05 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Kevin, >>> >>> the consumer needs to be serializable. Apparently you are also >>> serializing the Avro schema (probably as part of your >>> DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to >>> copy our SerializableAvroSchema [1] >>> >>> Make sure that everything is serializable. You can check that in a unit >>> test by using org.apache.commons.lang3.SerializationUtils.clone. >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34 >>> >>> On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam <kevin....@shopify.com> wrote: >>> >>>> Hi! >>>> >>>> We're using 1.13.1. We have a class in our user code that extends >>>> FlinkKafkaConsumer, that's built for reading avro records from Kafka. >>>> However it doesn't hold any Schema objects as fields so I'm a little >>>> confused. >>>> >>>> Something like this: >>>> >>>> ``` >>>> class OurSource[T <: ClassTag: TypeInformation: Decoder]( >>>> val name: String, >>>> val topic: String, >>>> val clusterConfig: KafkaClusterConfig, >>>> sslConfig: KafkaSSLConfig, >>>> offsetReset: Option[OffsetReset.Value] = None, >>>> offsetProvider: Option[OffsetProvider] = None >>>> ) extends FlinkKafkaConsumer[Option[T]]( >>>> topic, >>>> new DebeziumAvroRegistryDeserializationSchema[T]( >>>> clusterConfig, >>>> KafkaConfiguration.sslProperties(sslConfig) >>>> ), >>>> CollectionUtil.mapToProperties( >>>> KafkaConfiguration.consumerProperties(clusterConfig, name, >>>> isolationLevel = "read_committed") ++ >>>> KafkaConfiguration.sslProperties(sslConfig) >>>> ) >>>> ) >>>> ``` >>>> >>>> I understand how classloading may change, but why would that change >>>> whether we hit this serialization issue or not? >>>> >>>> On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com> >>>> wrote: >>>> >>>>> Hi! >>>>> >>>>> What Flink version are you using? In current Flink code base >>>>> FlinkKafkaConsumer does not contain fields related to Avro. >>>>> >>>>> Jars in usrlib has a higher priority to be loaded than jars in lib. So >>>>> if there is another FlinkKafkaConsumer class in your user jar then it >>>>> might >>>>> affect class loading and thus affect this issue. >>>>> >>>>> Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I'm trying to avoid dynamic class loading my user code [0] due to a >>>>>> suspected classloading leak, but when I put my application jar into /lib >>>>>> instead of /usrlib, I run into the following error: >>>>>> >>>>>> ``` >>>>>> The main method caused an error: The implementation of the >>>>>> FlinkKafkaConsumer is not serializable. The object probably contains or >>>>>> references non serializable fields. >>>>>> ``` >>>>>> >>>>>> which specifically seems to be caused by >>>>>> ``` >>>>>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema >>>>>> ``` >>>>>> >>>>>> What's curious to me about this is the error does not occur when we >>>>>> use dynamic classloading and put our application jar into /usrlib. >>>>>> >>>>>> Any ideas what's going on? It would seem to us that the method of >>>>>> loading the classes shouldn't impact whether or not something is >>>>>> serialized. >>>>>> >>>>>> Appreciate any help, thanks! >>>>>> >>>>>> [0] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code >>>>>> >>>>>