Hi all,
I am trying to work with flink to get avro data from kafka for which the
schemas are stored in kafka schema registry. Since, the producer for kafka is a
totally different service(an MQTT consumer sinked to kafka), I can’t have the
schema with me at the consumer end. I read around and diverged to the following
implementation of KeyedDeserializationSchema but I cannot understand why it’s
throwing a `com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException`
class AvroDeserializationSchema(schemaRegistryUrl: String) extends
KeyedDeserializationSchema[GenericRecord] {
// Flink needs the serializer to be serializable => this "@transient lazy
val" does the trick
@transient lazy val valueDeserializer = {
val deserializer = new KafkaAvroDeserializer(new
CachedSchemaRegistryClient(schemaRegistryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
deserializer.configure(
Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
schemaRegistryUrl,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG ->
false).asJava,
false)
deserializer
}
override def isEndOfStream(nextElement: GenericRecord): Boolean = false
override def deserialize(messageKey: Array[Byte], message: Array[Byte],
topic: String, partition: Int, offset: Long): GenericRecord = {
// val key = keyDeserializer.deserialize(topic,
messageKey).asInstanceOf[String]
val value = valueDeserializer.deserialize(topic,
message).asInstanceOf[GenericRecord]
value
}
override def getProducedType: TypeInformation[GenericRecord] =
TypeExtractor.getForClass(classOf[GenericRecord])
}
I have no clue how to go about solving this. I saw a lot of people trying to
implement the same. If someone can guide me, it’d be really helpful.
Thanks!
Nitish