Hi Nitish,

Kryo is the fallback serializer of Flink when everything else fails. In
general, performance suffers quite a bit and it's not always applicable as
in your case. Especially, in production code, it's best to avoid it
completely.

In your case, the issue is that your provided type information is
completely meaningless. getProducedType is not providing any actual type
information but just references to a generic skeleton. Flink uses the type
information to reason about the value structures, which it cannot in your
case.

If you really need to resort to a completely generic serializer (which is
usually not needed), then you have a few options:
* Easiest, stick to byte[] and convert in a downstream UDF. If it's that
generic you probably have only a simple transformation before outputting it
into some generic Kafka sink. So your UDF deserializes, does some generic
stuff, and immediately turns it back into byte[].
* Implement your own generic TypeInformation with serializer.
WritableTypeInfo [1] is a generic example on how to do it. This will
automatically convert byte[] back and forth to GenericRecord. That would be
the recommended way when you have multiple transformations before source
and sink.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java

On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com> wrote:

> Hi all,
>
> I am trying to work with flink to get avro data from kafka for which the
> schemas are stored in kafka schema registry. Since, the producer for kafka
> is a totally different service(an MQTT consumer sinked to kafka), I can’t
> have the schema with me at the consumer end. I read around and diverged to
> the following implementation of KeyedDeserializationSchema but I cannot
> understand why it’s throwing a `*com.esotericsoftware.kryo.KryoException:
> java.lang.**NullPointerException*`
>
> class AvroDeserializationSchema(schemaRegistryUrl: String) extends
> KeyedDeserializationSchema[GenericRecord] { // Flink needs the serializer
> to be serializable => this "@transient lazy val" does the trick @transient
> lazy val valueDeserializer = { val deserializer = new
> KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
> deserializer.configure(
> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
> schemaRegistryUrl, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
> -> false).asJava, false) deserializer } override def
> isEndOfStream(nextElement: GenericRecord): Boolean = false override def
> deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String,
> partition: Int, offset: Long): GenericRecord = { // val key =
> keyDeserializer.deserialize(topic, messageKey).asInstanceOf[String] val
> value = valueDeserializer.deserialize(topic,
> message).asInstanceOf[GenericRecord] value } override def getProducedType:
> TypeInformation[GenericRecord] =
> TypeExtractor.getForClass(classOf[GenericRecord]) }
>
> I have no clue how to go about solving this. I saw a lot of people trying
> to implement the same. If someone can guide me, it’d be really helpful.
>
> Thanks!
> Nitish
>

Reply via email to