Hi Nitish, Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As a GenericRecord is not a pojo this call will produce a GenericTypeInfo which uses Kryo serialization.
For a reference example I would recommend having a look at AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for working with GenericRecords. One important note. GenericRecords are not the best candidates for a data objects in Flink. The reason is if you apply any transformation on a GenericRecord e.g. map/flatMap. The input type information cannot be forwarded as the transformation is a black box from Flink's perspective. Therefore you would need to provide the type information for every step of the pipeline: TypeInformation<?> info = ... sEnv.addSource(...) // produces info .map(...) .returns(info) // must be provided again, as the map transformation is a black box, the transformation might produce a completely different record Hope that helps a bit. Best, Dawid On 02/03/2020 09:04, Arvid Heise wrote: > Hi Nitish, > > Kryo is the fallback serializer of Flink when everything else fails. > In general, performance suffers quite a bit and it's not always > applicable as in your case. Especially, in production code, it's best > to avoid it completely. > > In your case, the issue is that your provided type information is > completely meaningless. getProducedType is not providing any actual > type information but just references to a generic skeleton. Flink uses > the type information to reason about the value structures, which it > cannot in your case. > > If you really need to resort to a completely generic serializer (which > is usually not needed), then you have a few options: > * Easiest, stick to byte[] and convert in a downstream UDF. If it's > that generic you probably have only a simple transformation before > outputting it into some generic Kafka sink. So your UDF deserializes, > does some generic stuff, and immediately turns it back into byte[]. > * Implement your own generic TypeInformation with serializer. > WritableTypeInfo [1] is a generic example on how to do it. This will > automatically convert byte[] back and forth to GenericRecord. That > would be the recommended way when you have multiple transformations > before source and sink. > > [1] > https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java > > On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com > <mailto:nitishpant...@gmail.com>> wrote: > > Hi all, > > I am trying to work with flink to get avro data from kafka for > which the schemas are stored in kafka schema registry. Since, the > producer for kafka is a totally different service(an MQTT consumer > sinked to kafka), I can’t have the schema with me at the consumer > end. I read around and diverged to the following implementation of > KeyedDeserializationSchema but I cannot understand why it’s > throwing a `*com.esotericsoftware.kryo.KryoException: > java.lang.**NullPointerException*` > class AvroDeserializationSchema(schemaRegistryUrl: String) extends > KeyedDeserializationSchema[GenericRecord] { // Flink needs the > serializer to be serializable => this "@transient lazy val" does > the trick @transient lazy val valueDeserializer = { val > deserializer = new KafkaAvroDeserializer(new > CachedSchemaRegistryClient(schemaRegistryUrl, > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)) > deserializer.configure( > Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> > schemaRegistryUrl, > KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> > false).asJava, false) deserializer } override def > isEndOfStream(nextElement: GenericRecord): Boolean = false > override def deserialize(messageKey: Array[Byte], message: > Array[Byte], topic: String, partition: Int, offset: Long): > GenericRecord = { // val key = keyDeserializer.deserialize(topic, > messageKey).asInstanceOf[String] val value = > valueDeserializer.deserialize(topic, > message).asInstanceOf[GenericRecord] value } override def > getProducedType: TypeInformation[GenericRecord] = > TypeExtractor.getForClass(classOf[GenericRecord]) } > I have no clue how to go about solving this. I saw a lot of people > trying to implement the same. If someone can guide me, it’d be > really helpful. > > Thanks! > Nitish >
signature.asc
Description: OpenPGP digital signature