Hi Arvid,

Got it, we don't use Avro.schema inside of
DebeziumAvroRegistryDeserializationSchema, but I tried to test it with a
unit test and `org.apache.commons.lang3.SerializationUtils.clone` runs
successfully.

I'm curious as to why things work (are serializable) when we use dynamic
classloading, but do not work when we put this in lib/ and bypass the
dynamic loading--any ideas there?



On Thu, Aug 26, 2021 at 11:05 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Kevin,
>
> the consumer needs to be serializable. Apparently you are also serializing
> the Avro schema (probably as part of your
> DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to
> copy our SerializableAvroSchema [1]
>
> Make sure that everything is serializable. You can check that in a unit
> test by using org.apache.commons.lang3.SerializationUtils.clone.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34
>
> On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam <kevin....@shopify.com> wrote:
>
>> Hi!
>>
>> We're using 1.13.1. We have a class in our user code that extends
>> FlinkKafkaConsumer, that's built for reading avro records from Kafka.
>> However it doesn't hold any Schema objects as fields so I'm a little
>> confused.
>>
>> Something like this:
>>
>> ```
>> class OurSource[T <: ClassTag: TypeInformation: Decoder](
>>   val name: String,
>>   val topic: String,
>>   val clusterConfig: KafkaClusterConfig,
>>   sslConfig: KafkaSSLConfig,
>>   offsetReset: Option[OffsetReset.Value] = None,
>>   offsetProvider: Option[OffsetProvider] = None
>> ) extends FlinkKafkaConsumer[Option[T]](
>>     topic,
>>     new DebeziumAvroRegistryDeserializationSchema[T](
>>       clusterConfig,
>>       KafkaConfiguration.sslProperties(sslConfig)
>>     ),
>>     CollectionUtil.mapToProperties(
>>       KafkaConfiguration.consumerProperties(clusterConfig, name,
>> isolationLevel = "read_committed") ++
>>         KafkaConfiguration.sslProperties(sslConfig)
>>     )
>>   )
>> ```
>>
>> I understand how classloading may change, but why would that change
>> whether we hit this serialization issue or not?
>>
>> On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com>
>> wrote:
>>
>>> Hi!
>>>
>>> What Flink version are you using? In current Flink code base
>>> FlinkKafkaConsumer does not contain fields related to Avro.
>>>
>>> Jars in usrlib has a higher priority to be loaded than jars in lib. So
>>> if there is another FlinkKafkaConsumer class in your user jar then it might
>>> affect class loading and thus affect this issue.
>>>
>>> Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to avoid dynamic class loading my user code [0] due to a
>>>> suspected classloading leak, but when I put my application jar into /lib
>>>> instead of /usrlib, I run into the following error:
>>>>
>>>> ```
>>>> The main method caused an error: The implementation of the
>>>> FlinkKafkaConsumer is not serializable. The object probably contains or
>>>> references non serializable fields.
>>>> ```
>>>>
>>>> which specifically seems to be caused by
>>>> ```
>>>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema
>>>> ```
>>>>
>>>> What's curious to me about this is the error does not occur when we use
>>>> dynamic classloading and put our application jar into /usrlib.
>>>>
>>>> Any ideas what's going on? It would seem to us that the method of
>>>> loading the classes shouldn't impact whether or not something is
>>>> serialized.
>>>>
>>>> Appreciate any help, thanks!
>>>>
>>>> [0]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
>>>>
>>>

Reply via email to