Sorry for the late reply here, I'm just returning to this now. Interesting re: the avro version, we're using 1.10.0 in our application jar. But maybe this is somehow being clobbered when we try to move it into /lib vs. /usrlib to avoid dynamic class loading. Is it possible that's happening?
On Fri, Aug 27, 2021 at 2:28 PM Arvid Heise <ar...@apache.org> wrote: > I guess the best option is to attach a debugger and set a breakpoint at > the NotSerializableException. There definitively has to be a > non-serializable component in that FlinkKafkaConsumer and it can only come > from the DeserializationSchema or Properties. > Maybe the consumer internally caches some values generated by your schema > at some point but I couldn't think anything obvious. There is a high chance > that it comes by your code and only activates on cluster. > It would be nice to hear back from you when you have found that respective > field. It should be 2 object references deep in FlinkKafkaConsumer (2 > writeObject0 before the first writeArray that most likely corresponds to > your RecordSchema) > > Btw which Avro version are you using? It looks like Avro 1.10.X finally > has serializable schema... Maybe this might also explain why it works in > one submission and not in the other? > > On Fri, Aug 27, 2021 at 4:10 PM Kevin Lam <kevin....@shopify.com> wrote: > >> There's no inner classes, and none of the fields >> of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when >> expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed >> of Strings and Booleans. >> >> DebeziumAvroRegistryDeserializationSchema has a field that initializes a >> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient but >> this is marked @transient and lazy in Scala, similarly the deserializer >> uses that client to initialize a transient+lazy field which builds a >> KafkaAvroDeserializer >> >>>