Sorry for the late reply here, I'm just returning to this now.

Interesting re: the avro version, we're using 1.10.0 in our application
jar. But maybe this is somehow being clobbered when we try to move it into
/lib vs. /usrlib to avoid dynamic class loading. Is it possible that's
happening?

On Fri, Aug 27, 2021 at 2:28 PM Arvid Heise <ar...@apache.org> wrote:

> I guess the best option is to attach a debugger and set a breakpoint at
> the NotSerializableException. There definitively has to be a
> non-serializable component in that FlinkKafkaConsumer and it can only come
> from the DeserializationSchema or Properties.
> Maybe the consumer internally caches some values generated by your schema
> at some point but I couldn't think anything obvious. There is a high chance
> that it comes by your code and only activates on cluster.
> It would be nice to hear back from you when you have found that respective
> field. It should be 2 object references deep in FlinkKafkaConsumer (2
> writeObject0 before the first writeArray that most likely corresponds to
> your RecordSchema)
>
> Btw which Avro version are you using? It looks like Avro 1.10.X finally
> has serializable schema... Maybe this might also explain why it works in
> one submission and not in the other?
>
> On Fri, Aug 27, 2021 at 4:10 PM Kevin Lam <kevin....@shopify.com> wrote:
>
>> There's no inner classes, and none of the fields
>> of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when
>> expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed
>> of Strings and Booleans.
>>
>> DebeziumAvroRegistryDeserializationSchema has a field that initializes a
>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient but
>> this is marked @transient and lazy in Scala, similarly the deserializer
>> uses that client to initialize a transient+lazy field which builds a
>> KafkaAvroDeserializer
>>
>>>

Reply via email to