Hi Kevin,

the consumer needs to be serializable. Apparently you are also serializing
the Avro schema (probably as part of your
DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to
copy our SerializableAvroSchema [1]

Make sure that everything is serializable. You can check that in a unit
test by using org.apache.commons.lang3.SerializationUtils.clone.

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34

On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam <kevin....@shopify.com> wrote:

> Hi!
>
> We're using 1.13.1. We have a class in our user code that extends
> FlinkKafkaConsumer, that's built for reading avro records from Kafka.
> However it doesn't hold any Schema objects as fields so I'm a little
> confused.
>
> Something like this:
>
> ```
> class OurSource[T <: ClassTag: TypeInformation: Decoder](
>   val name: String,
>   val topic: String,
>   val clusterConfig: KafkaClusterConfig,
>   sslConfig: KafkaSSLConfig,
>   offsetReset: Option[OffsetReset.Value] = None,
>   offsetProvider: Option[OffsetProvider] = None
> ) extends FlinkKafkaConsumer[Option[T]](
>     topic,
>     new DebeziumAvroRegistryDeserializationSchema[T](
>       clusterConfig,
>       KafkaConfiguration.sslProperties(sslConfig)
>     ),
>     CollectionUtil.mapToProperties(
>       KafkaConfiguration.consumerProperties(clusterConfig, name,
> isolationLevel = "read_committed") ++
>         KafkaConfiguration.sslProperties(sslConfig)
>     )
>   )
> ```
>
> I understand how classloading may change, but why would that change
> whether we hit this serialization issue or not?
>
> On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> What Flink version are you using? In current Flink code base
>> FlinkKafkaConsumer does not contain fields related to Avro.
>>
>> Jars in usrlib has a higher priority to be loaded than jars in lib. So if
>> there is another FlinkKafkaConsumer class in your user jar then it might
>> affect class loading and thus affect this issue.
>>
>> Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道:
>>
>>> Hi all,
>>>
>>> I'm trying to avoid dynamic class loading my user code [0] due to a
>>> suspected classloading leak, but when I put my application jar into /lib
>>> instead of /usrlib, I run into the following error:
>>>
>>> ```
>>> The main method caused an error: The implementation of the
>>> FlinkKafkaConsumer is not serializable. The object probably contains or
>>> references non serializable fields.
>>> ```
>>>
>>> which specifically seems to be caused by
>>> ```
>>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema
>>> ```
>>>
>>> What's curious to me about this is the error does not occur when we use
>>> dynamic classloading and put our application jar into /usrlib.
>>>
>>> Any ideas what's going on? It would seem to us that the method of
>>> loading the classes shouldn't impact whether or not something is
>>> serialized.
>>>
>>> Appreciate any help, thanks!
>>>
>>> [0]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
>>>
>>

Reply via email to