I didn't get the use case completely. Are you using several sensors with different schemas? Are processing them jointly?
Let's assume some cases: 1) Only one format, it would be best to generate a case class with avrohugger. That is especially true if you processing actually requires specific fields to be present. 2) Several sensors, but processed independently. You could do the same as 1) for all sensors. If you don't need to access specific fields, you should fetch the latest schema in your main() and all the things that Flink provides. 3) You have constantly changing schemas and want to forward records always with the latest schema enriched with some fields. You need to stick to GenericRecord. I'd go with the byte[] approach of my first response if you only have one such application / processing step. 4) Else go with the custom TypeInfo/Serializer. We can help you to implement it. If you can do it yourself, I'd be awesome to put it as a response here for other users. On Mon, Mar 2, 2020 at 11:01 AM Nitish Pant <nitishpant...@gmail.com> wrote: > Hi, > > So I am building a data pipeline that takes input from sensors via MQTT > broker and passes it to kafka. Before it goes to kafka, I am filtering and > serializing the filtered data into avro format and keeping the schema in > the registry. Now I want to get that data in flink to process it using some > algorithms. So, at the flinkKafkaConsumer end, I currently don’t have the > schemas for my data. One work around for me would be to get the schema > corresponding the data that I’ll be getting from a topic separately from > the registry and then work forward, but I was hoping there would a way to > avoid this and integrate the schema registry with my consumer in some way > like kafka-connect does. This is why I was trying this solution. > > Do you think I should maybe do the work around method as implementing a > GenericRecord would be more of a overhead in the longer run? > > Thanks! > > > On 02-Mar-2020, at 3:11 PM, Arvid Heise <ar...@ververica.com> wrote: > > Could you please give more background on your use case? It's hard to give > any advice with the little information you gave us. > > Usually, the consumer should know the schema or else it's hard to do > meaningful processing. > If it's something completely generic, then there is no way around it, but > that should be the last straw. Here my recommendations from my first > response would come into play. > > If they are not working for you for some reason, please let me know why > and I could come up with a solution. > > On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant <nitishpant...@gmail.com> > wrote: > >> Hi, >> >> Thanks for the replies. I get that it is not wise to use GenericRecord >> and that is what is causing the Kryo fallback, but then if not this, how >> should I go about writing a AvroSchemaRegistrySchema for when I don’t know >> the schema. Without the knowledge of schema, I can’t create a class. Can >> you suggest a way of getting around that? >> >> Thanks! >> >> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >> Hi Nitish, >> >> Just to slightly extend on Arvid's reply. As Arvid said the Kryo >> serializer comes from the call to >> TypeExtractor.getForClass(classOf[GenericRecord]). >> As a GenericRecord is not a pojo this call will produce a GenericTypeInfo >> which uses Kryo serialization. >> >> For a reference example I would recommend having a look at >> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for >> working with GenericRecords. One important note. GenericRecords are not the >> best candidates for a data objects in Flink. The reason is if you apply any >> transformation on a GenericRecord e.g. map/flatMap. The input type >> information cannot be forwarded as the transformation is a black box from >> Flink's perspective. Therefore you would need to provide the type >> information for every step of the pipeline: >> >> TypeInformation<?> info = ... >> >> sEnv.addSource(...) // produces info >> >> .map(...) >> >> .returns(info) // must be provided again, as the map transformation is a >> black box, the transformation might produce a completely different record >> >> Hope that helps a bit. >> >> Best, >> >> Dawid >> On 02/03/2020 09:04, Arvid Heise wrote: >> >> Hi Nitish, >> >> Kryo is the fallback serializer of Flink when everything else fails. In >> general, performance suffers quite a bit and it's not always applicable as >> in your case. Especially, in production code, it's best to avoid it >> completely. >> >> In your case, the issue is that your provided type information is >> completely meaningless. getProducedType is not providing any actual type >> information but just references to a generic skeleton. Flink uses the type >> information to reason about the value structures, which it cannot in your >> case. >> >> If you really need to resort to a completely generic serializer (which is >> usually not needed), then you have a few options: >> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that >> generic you probably have only a simple transformation before outputting it >> into some generic Kafka sink. So your UDF deserializes, does some generic >> stuff, and immediately turns it back into byte[]. >> * Implement your own generic TypeInformation with serializer. >> WritableTypeInfo [1] is a generic example on how to do it. This will >> automatically convert byte[] back and forth to GenericRecord. That would be >> the recommended way when you have multiple transformations before source >> and sink. >> >> [1] >> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java >> >> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com> >> wrote: >> >>> Hi all, >>> >>> I am trying to work with flink to get avro data from kafka for which the >>> schemas are stored in kafka schema registry. Since, the producer for kafka >>> is a totally different service(an MQTT consumer sinked to kafka), I can’t >>> have the schema with me at the consumer end. I read around and diverged to >>> the following implementation of KeyedDeserializationSchema but I cannot >>> understand why it’s throwing a `*com.esotericsoftware.kryo.KryoException: >>> java.lang.**NullPointerException*` >>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends >>> KeyedDeserializationSchema[GenericRecord] { // Flink needs the serializer >>> to be serializable => this "@transient lazy val" does the trick @transient >>> lazy val valueDeserializer = { val deserializer = new >>> KafkaAvroDeserializer(new CachedSchemaRegistryClient(schemaRegistryUrl, >>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)) >>> deserializer.configure( >>> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >>> schemaRegistryUrl, KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG >>> -> false).asJava, false) deserializer } override def >>> isEndOfStream(nextElement: GenericRecord): Boolean = false override def >>> deserialize(messageKey: Array[Byte], message: Array[Byte], topic: String, >>> partition: Int, offset: Long): GenericRecord = { // val key = >>> keyDeserializer.deserialize(topic, messageKey).asInstanceOf[String] val >>> value = valueDeserializer.deserialize(topic, >>> message).asInstanceOf[GenericRecord] value } override def getProducedType: >>> TypeInformation[GenericRecord] = >>> TypeExtractor.getForClass(classOf[GenericRecord]) } >>> I have no clue how to go about solving this. I saw a lot of people >>> trying to implement the same. If someone can guide me, it’d be really >>> helpful. >>> >>> Thanks! >>> Nitish >>> >> >> >