Hi Nitish,

Just to slightly extend on Arvid's reply. As Arvid said the Kryo
serializer comes from the call to
TypeExtractor.getForClass(classOf[GenericRecord]). As a GenericRecord is
not a pojo this call will produce a GenericTypeInfo which uses Kryo
serialization.

For a reference example I would recommend having a look at
AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for
working with GenericRecords. One important note. GenericRecords are not
the best candidates for a data objects in Flink. The reason is if you
apply any transformation on a GenericRecord e.g. map/flatMap. The input
type information cannot be forwarded as the transformation is a black
box from Flink's perspective. Therefore you would need to provide the
type information for every step of the pipeline:

TypeInformation<?> info = ...

sEnv.addSource(...) // produces info

.map(...)

.returns(info) // must be provided again, as the map transformation is a
black box, the transformation might produce a completely different record

Hope that helps a bit.

Best,

Dawid

On 02/03/2020 09:04, Arvid Heise wrote:
> Hi Nitish,
>
> Kryo is the fallback serializer of Flink when everything else fails.
> In general, performance suffers quite a bit and it's not always
> applicable as in your case. Especially, in production code, it's best
> to avoid it completely.
>
> In your case, the issue is that your provided type information is
> completely meaningless. getProducedType is not providing any actual
> type information but just references to a generic skeleton. Flink uses
> the type information to reason about the value structures, which it
> cannot in your case.
>
> If you really need to resort to a completely generic serializer (which
> is usually not needed), then you have a few options:
> * Easiest, stick to byte[] and convert in a downstream UDF. If it's
> that generic you probably have only a simple transformation before
> outputting it into some generic Kafka sink. So your UDF deserializes,
> does some generic stuff, and immediately turns it back into byte[].
> * Implement your own generic TypeInformation with serializer.
> WritableTypeInfo [1] is a generic example on how to do it. This will
> automatically convert byte[] back and forth to GenericRecord. That
> would be the recommended way when you have multiple transformations
> before source and sink.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>
> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com
> <mailto:nitishpant...@gmail.com>> wrote:
>
>     Hi all,
>
>     I am trying to work with flink to get avro data from kafka for
>     which the schemas are stored in kafka schema registry. Since, the
>     producer for kafka is a totally different service(an MQTT consumer
>     sinked to kafka), I can’t have the schema with me at the consumer
>     end. I read around and diverged to the following implementation of
>     KeyedDeserializationSchema but I cannot understand why it’s
>     throwing a `*com.esotericsoftware.kryo.KryoException:
>     java.lang.**NullPointerException*`
>     class AvroDeserializationSchema(schemaRegistryUrl: String) extends
>     KeyedDeserializationSchema[GenericRecord] { // Flink needs the
>     serializer to be serializable => this "@transient lazy val" does
>     the trick @transient lazy val valueDeserializer = { val
>     deserializer = new KafkaAvroDeserializer(new
>     CachedSchemaRegistryClient(schemaRegistryUrl,
>     AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
>     deserializer.configure(
>     Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG ->
>     schemaRegistryUrl,
>     KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG ->
>     false).asJava, false) deserializer } override def
>     isEndOfStream(nextElement: GenericRecord): Boolean = false
>     override def deserialize(messageKey: Array[Byte], message:
>     Array[Byte], topic: String, partition: Int, offset: Long):
>     GenericRecord = { // val key = keyDeserializer.deserialize(topic,
>     messageKey).asInstanceOf[String] val value =
>     valueDeserializer.deserialize(topic,
>     message).asInstanceOf[GenericRecord] value } override def
>     getProducedType: TypeInformation[GenericRecord] =
>     TypeExtractor.getForClass(classOf[GenericRecord]) }
>     I have no clue how to go about solving this. I saw a lot of people
>     trying to implement the same. If someone can guide me, it’d be
>     really helpful.
>
>     Thanks!
>     Nitish
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to