Could you please give more background on your use case? It's hard to give any advice with the little information you gave us.
Usually, the consumer should know the schema or else it's hard to do meaningful processing. If it's something completely generic, then there is no way around it, but that should be the last straw. Here my recommendations from my first response would come into play. If they are not working for you for some reason, please let me know why and I could come up with a solution. On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant <nitishpant...@gmail.com> wrote: > Hi, > > Thanks for the replies. I get that it is not wise to use GenericRecord and > that is what is causing the Kryo fallback, but then if not this, how should > I go about writing a AvroSchemaRegistrySchema for when I don’t know the > schema. Without the knowledge of schema, I can’t create a class. Can you > suggest a way of getting around that? > > Thanks! > > On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > > Hi Nitish, > > Just to slightly extend on Arvid's reply. As Arvid said the Kryo > serializer comes from the call to > TypeExtractor.getForClass(classOf[GenericRecord]). > As a GenericRecord is not a pojo this call will produce a GenericTypeInfo > which uses Kryo serialization. > > For a reference example I would recommend having a look at > AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for > working with GenericRecords. One important note. GenericRecords are not the > best candidates for a data objects in Flink. The reason is if you apply any > transformation on a GenericRecord e.g. map/flatMap. The input type > information cannot be forwarded as the transformation is a black box from > Flink's perspective. Therefore you would need to provide the type > information for every step of the pipeline: > > TypeInformation<?> info = ... > > sEnv.addSource(...) // produces info > > .map(...) > > .returns(info) // must be provided again, as the map transformation is a > black box, the transformation might produce a completely different record > > Hope that helps a bit. > > Best, > > Dawid > On 02/03/2020 09:04, Arvid Heise wrote: > > Hi Nitish, > > Kryo is the fallback serializer of Flink when everything else fails. In > general, performance suffers quite a bit and it's not always applicable as > in your case. Especially, in production code, it's best to avoid it > completely. > > In your case, the issue is that your provided type information is > completely meaningless. getProducedType is not providing any actual type > information but just references to a generic skeleton. Flink uses the type > information to reason about the value structures, which it cannot in your > case. > > If you really need to resort to a completely generic serializer (which is > usually not needed), then you have a few options: > * Easiest, stick to byte[] and convert in a downstream UDF. If it's that > generic you probably have only a simple transformation before outputting it > into some generic Kafka sink. So your UDF deserializes, does some generic > stuff, and immediately turns it back into byte[]. > * Implement your own generic TypeInformation with serializer. > WritableTypeInfo [1] is a generic example on how to do it. This will > automatically convert byte[] back and forth to GenericRecord. That would be > the recommended way when you have multiple transformations before source > and sink. > > [1] > https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java > > On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com> > wrote: > >> Hi all, >> >> I am trying to work with flink to get avro data from kafka for which the >> schemas are stored in kafka schema registry. Since, the producer for kafka >> is a totally different service(an MQTT consumer sinked to kafka), I can’t >> have the schema with me at the consumer end. I read around and diverged to >> the following implementation of KeyedDeserializationSchema but I cannot >> understand why it’s throwing a `*com.esotericsoftware.kryo.KryoException: >> java.lang.**NullPointerException*` >> class AvroDeserializationSchema(schemaRegistryUrl: String) extends >> KeyedDeserializationSchema[GenericRecord] { // Flink needs the serializer >> to be serializable => this "@transient lazy val" does the trick @transient >> lazy val valueDeserializer = { val deserializer = new >> KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl, >> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)) >> deserializer.configure( >> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >> schemaRegistryUrl, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG >> -> false).asJava, false) deserializer } override def >> isEndOfStream(nextElement: GenericRecord): Boolean = false override def >> deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String, >> partition: Int, offset: Long): GenericRecord = { // val key = >> keyDeserializer.deserialize(topic, messageKey).asInstanceOf[String] val >> value = valueDeserializer.deserialize(topic, >> message).asInstanceOf[GenericRecord] value } override def getProducedType: >> TypeInformation[GenericRecord] = >> TypeExtractor.getForClass(classOf[GenericRecord]) } >> I have no clue how to go about solving this. I saw a lot of people trying >> to implement the same. If someone can guide me, it’d be really helpful. >> >> Thanks! >> Nitish >> > >