I also tested serializing an instance of `OurSource` with
`org.apache.commons.lang3.SerializationUtils.clone`  and it worked fine.

On Thu, Aug 26, 2021 at 3:27 PM Kevin Lam <kevin....@shopify.com> wrote:

> Hi Arvid,
> Got it, we don't use Avro.schema inside of
> DebeziumAvroRegistryDeserializationSchema, but I tried to test it with a
> unit test and `org.apache.commons.lang3.SerializationUtils.clone` runs
> successfully.
> I'm curious as to why things work (are serializable) when we use dynamic
> classloading, but do not work when we put this in lib/ and bypass the
> dynamic loading--any ideas there?
> On Thu, Aug 26, 2021 at 11:05 AM Arvid Heise <ar...@apache.org> wrote:
>> Hi Kevin,
>> the consumer needs to be serializable. Apparently you are also
>> serializing the Avro schema (probably as part of your
>> DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to
>> copy our SerializableAvroSchema [1]
>> Make sure that everything is serializable. You can check that in a unit
>> test by using org.apache.commons.lang3.SerializationUtils.clone.
>> [1]
>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34
>> On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam <kevin....@shopify.com> wrote:
>>> Hi!
>>> We're using 1.13.1. We have a class in our user code that extends
>>> FlinkKafkaConsumer, that's built for reading avro records from Kafka.
>>> However it doesn't hold any Schema objects as fields so I'm a little
>>> confused.
>>> Something like this:
>>> ```
>>> class OurSource[T <: ClassTag: TypeInformation: Decoder](
>>>   val name: String,
>>>   val topic: String,
>>>   val clusterConfig: KafkaClusterConfig,
>>>   sslConfig: KafkaSSLConfig,
>>>   offsetReset: Option[OffsetReset.Value] = None,
>>>   offsetProvider: Option[OffsetProvider] = None
>>> ) extends FlinkKafkaConsumer[Option[T]](
>>>     topic,
>>>     new DebeziumAvroRegistryDeserializationSchema[T](
>>>       clusterConfig,
>>>       KafkaConfiguration.sslProperties(sslConfig)
>>>     ),
>>>     CollectionUtil.mapToProperties(
>>>       KafkaConfiguration.consumerProperties(clusterConfig, name,
>>> isolationLevel = "read_committed") ++
>>>         KafkaConfiguration.sslProperties(sslConfig)
>>>     )
>>>   )
>>> ```
>>> I understand how classloading may change, but why would that change
>>> whether we hit this serialization issue or not?
>>> On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com>
>>> wrote:
>>>> Hi!
>>>> What Flink version are you using? In current Flink code base
>>>> FlinkKafkaConsumer does not contain fields related to Avro.
>>>> Jars in usrlib has a higher priority to be loaded than jars in lib. So
>>>> if there is another FlinkKafkaConsumer class in your user jar then it might
>>>> affect class loading and thus affect this issue.
>>>> Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道:
>>>>> Hi all,
>>>>> I'm trying to avoid dynamic class loading my user code [0] due to a
>>>>> suspected classloading leak, but when I put my application jar into /lib
>>>>> instead of /usrlib, I run into the following error:
>>>>> ```
>>>>> The main method caused an error: The implementation of the
>>>>> FlinkKafkaConsumer is not serializable. The object probably contains or
>>>>> references non serializable fields.
>>>>> ```
>>>>> which specifically seems to be caused by
>>>>> ```
>>>>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema
>>>>> ```
>>>>> What's curious to me about this is the error does not occur when we
>>>>> use dynamic classloading and put our application jar into /usrlib.
>>>>> Any ideas what's going on? It would seem to us that the method of
>>>>> loading the classes shouldn't impact whether or not something is
>>>>> serialized.
>>>>> Appreciate any help, thanks!
>>>>> [0]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code

Reply via email to