Hi!

We're using 1.13.1. We have a class in our user code that extends
FlinkKafkaConsumer, that's built for reading avro records from Kafka.
However it doesn't hold any Schema objects as fields so I'm a little
confused.

Something like this:

```
class OurSource[T <: ClassTag: TypeInformation: Decoder](
  val name: String,
  val topic: String,
  val clusterConfig: KafkaClusterConfig,
  sslConfig: KafkaSSLConfig,
  offsetReset: Option[OffsetReset.Value] = None,
  offsetProvider: Option[OffsetProvider] = None
) extends FlinkKafkaConsumer[Option[T]](
    topic,
    new DebeziumAvroRegistryDeserializationSchema[T](
      clusterConfig,
      KafkaConfiguration.sslProperties(sslConfig)
    ),
    CollectionUtil.mapToProperties(
      KafkaConfiguration.consumerProperties(clusterConfig, name,
isolationLevel = "read_committed") ++
        KafkaConfiguration.sslProperties(sslConfig)
    )
  )
```

I understand how classloading may change, but why would that change whether
we hit this serialization issue or not?

On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> What Flink version are you using? In current Flink code base
> FlinkKafkaConsumer does not contain fields related to Avro.
>
> Jars in usrlib has a higher priority to be loaded than jars in lib. So if
> there is another FlinkKafkaConsumer class in your user jar then it might
> affect class loading and thus affect this issue.
>
> Kevin Lam <kevin....@shopify.com> 于2021年8月25日周三 下午11:18写道:
>
>> Hi all,
>>
>> I'm trying to avoid dynamic class loading my user code [0] due to a
>> suspected classloading leak, but when I put my application jar into /lib
>> instead of /usrlib, I run into the following error:
>>
>> ```
>> The main method caused an error: The implementation of the
>> FlinkKafkaConsumer is not serializable. The object probably contains or
>> references non serializable fields.
>> ```
>>
>> which specifically seems to be caused by
>> ```
>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema
>> ```
>>
>> What's curious to me about this is the error does not occur when we use
>> dynamic classloading and put our application jar into /usrlib.
>>
>> Any ideas what's going on? It would seem to us that the method of loading
>> the classes shouldn't impact whether or not something is serialized.
>>
>> Appreciate any help, thanks!
>>
>> [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
>>
>

Reply via email to