Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi Arvid, It’s actually the second case. I just wanted to build a scalable generic case where I can pass a set of kafka topics and my consumer can use the same AvroDeserializationSchema. But yeah, I think I’ll do the fetching latest schema part in main() separately. Thanks for the help! > On

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
I didn't get the use case completely. Are you using several sensors with different schemas? Are processing them jointly? Let's assume some cases: 1) Only one format, it would be best to generate a case class with avrohugger. That is especially true if you processing actually requires specific fiel

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi, So I am building a data pipeline that takes input from sensors via MQTT broker and passes it to kafka. Before it goes to kafka, I am filtering and serializing the filtered data into avro format and keeping the schema in the registry. Now I want to get that data in flink to process it using

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
Could you please give more background on your use case? It's hard to give any advice with the little information you gave us. Usually, the consumer should know the schema or else it's hard to do meaningful processing. If it's something completely generic, then there is no way around it, but that s

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Nitish Pant
Hi, Thanks for the replies. I get that it is not wise to use GenericRecord and that is what is causing the Kryo fallback, but then if not this, how should I go about writing a AvroSchemaRegistrySchema for when I don’t know the schema. Without the knowledge of schema, I can’t create a class. Can

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Dawid Wysakowicz
Hi Nitish, Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As a GenericRecord is not a pojo this call will produce a GenericTypeInfo which uses Kryo serialization. For a reference example I would

Re: Schema registry deserialization: Kryo NPE error

2020-03-02 Thread Arvid Heise
Hi Nitish, Kryo is the fallback serializer of Flink when everything else fails. In general, performance suffers quite a bit and it's not always applicable as in your case. Especially, in production code, it's best to avoid it completely. In your case, the issue is that your provided type informat

Schema registry deserialization: Kryo NPE error

2020-03-01 Thread Nitish Pant
Hi all, I am trying to work with flink to get avro data from kafka for which the schemas are stored in kafka schema registry. Since, the producer for kafka is a totally different service(an MQTT consumer sinked to kafka), I can’t have the schema with me at the consumer end. I read around and di