Hi Nitish, Kryo is the fallback serializer of Flink when everything else fails. In general, performance suffers quite a bit and it's not always applicable as in your case. Especially, in production code, it's best to avoid it completely.
In your case, the issue is that your provided type information is completely meaningless. getProducedType is not providing any actual type information but just references to a generic skeleton. Flink uses the type information to reason about the value structures, which it cannot in your case. If you really need to resort to a completely generic serializer (which is usually not needed), then you have a few options: * Easiest, stick to byte[] and convert in a downstream UDF. If it's that generic you probably have only a simple transformation before outputting it into some generic Kafka sink. So your UDF deserializes, does some generic stuff, and immediately turns it back into byte[]. * Implement your own generic TypeInformation with serializer. WritableTypeInfo [1] is a generic example on how to do it. This will automatically convert byte[] back and forth to GenericRecord. That would be the recommended way when you have multiple transformations before source and sink. [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com> wrote: > Hi all, > > I am trying to work with flink to get avro data from kafka for which the > schemas are stored in kafka schema registry. Since, the producer for kafka > is a totally different service(an MQTT consumer sinked to kafka), I can’t > have the schema with me at the consumer end. I read around and diverged to > the following implementation of KeyedDeserializationSchema but I cannot > understand why it’s throwing a `*com.esotericsoftware.kryo.KryoException: > java.lang.**NullPointerException*` > > class AvroDeserializationSchema(schemaRegistryUrl: String) extends > KeyedDeserializationSchema[GenericRecord] { // Flink needs the serializer > to be serializable => this "@transient lazy val" does the trick @transient > lazy val valueDeserializer = { val deserializer = new > KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl, > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)) > deserializer.configure( > Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> > schemaRegistryUrl, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG > -> false).asJava, false) deserializer } override def > isEndOfStream(nextElement: GenericRecord): Boolean = false override def > deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String, > partition: Int, offset: Long): GenericRecord = { // val key = > keyDeserializer.deserialize(topic, messageKey).asInstanceOf[String] val > value = valueDeserializer.deserialize(topic, > message).asInstanceOf[GenericRecord] value } override def getProducedType: > TypeInformation[GenericRecord] = > TypeExtractor.getForClass(classOf[GenericRecord]) } > > I have no clue how to go about solving this. I saw a lot of people trying > to implement the same. If someone can guide me, it’d be really helpful. > > Thanks! > Nitish >