Hi Arvid, It’s actually the second case. I just wanted to build a scalable generic case where I can pass a set of kafka topics and my consumer can use the same AvroDeserializationSchema. But yeah, I think I’ll do the fetching latest schema part in main() separately.
Thanks for the help! > On 02-Mar-2020, at 3:50 PM, Arvid Heise <ar...@ververica.com> wrote: > > I didn't get the use case completely. Are you using several sensors with > different schemas? Are processing them jointly? > > Let's assume some cases: > 1) Only one format, it would be best to generate a case class with > avrohugger. That is especially true if you processing actually requires > specific fields to be present. > 2) Several sensors, but processed independently. You could do the same as 1) > for all sensors. If you don't need to access specific fields, you should > fetch the latest schema in your main() and all the things that Flink provides. > 3) You have constantly changing schemas and want to forward records always > with the latest schema enriched with some fields. You need to stick to > GenericRecord. I'd go with the byte[] approach of my first response if you > only have one such application / processing step. > 4) Else go with the custom TypeInfo/Serializer. We can help you to implement > it. If you can do it yourself, I'd be awesome to put it as a response here > for other users. > > On Mon, Mar 2, 2020 at 11:01 AM Nitish Pant <nitishpant...@gmail.com > <mailto:nitishpant...@gmail.com>> wrote: > Hi, > > So I am building a data pipeline that takes input from sensors via MQTT > broker and passes it to kafka. Before it goes to kafka, I am filtering and > serializing the filtered data into avro format and keeping the schema in the > registry. Now I want to get that data in flink to process it using some > algorithms. So, at the flinkKafkaConsumer end, I currently don’t have the > schemas for my data. One work around for me would be to get the schema > corresponding the data that I’ll be getting from a topic separately from the > registry and then work forward, but I was hoping there would a way to avoid > this and integrate the schema registry with my consumer in some way like > kafka-connect does. This is why I was trying this solution. > > Do you think I should maybe do the work around method as implementing a > GenericRecord would be more of a overhead in the longer run? > > Thanks! > > >> On 02-Mar-2020, at 3:11 PM, Arvid Heise <ar...@ververica.com >> <mailto:ar...@ververica.com>> wrote: >> >> Could you please give more background on your use case? It's hard to give >> any advice with the little information you gave us. >> >> Usually, the consumer should know the schema or else it's hard to do >> meaningful processing. >> If it's something completely generic, then there is no way around it, but >> that should be the last straw. Here my recommendations from my first >> response would come into play. >> >> If they are not working for you for some reason, please let me know why and >> I could come up with a solution. >> >> On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant <nitishpant...@gmail.com >> <mailto:nitishpant...@gmail.com>> wrote: >> Hi, >> >> Thanks for the replies. I get that it is not wise to use GenericRecord and >> that is what is causing the Kryo fallback, but then if not this, how should >> I go about writing a AvroSchemaRegistrySchema for when I don’t know the >> schema. Without the knowledge of schema, I can’t create a class. Can you >> suggest a way of getting around that? >> >> Thanks! >> >>> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakow...@apache.org >>> <mailto:dwysakow...@apache.org>> wrote: >>> >>> Hi Nitish, >>> >>> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer >>> comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). >>> As a GenericRecord is not a pojo this call will produce a GenericTypeInfo >>> which uses Kryo serialization. >>> >>> For a reference example I would recommend having a look at >>> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for >>> working with GenericRecords. One important note. GenericRecords are not the >>> best candidates for a data objects in Flink. The reason is if you apply any >>> transformation on a GenericRecord e.g. map/flatMap. The input type >>> information cannot be forwarded as the transformation is a black box from >>> Flink's perspective. Therefore you would need to provide the type >>> information for every step of the pipeline: >>> >>> TypeInformation<?> info = ... >>> >>> sEnv.addSource(...) // produces info >>> >>> .map(...) >>> >>> .returns(info) // must be provided again, as the map transformation is a >>> black box, the transformation might produce a completely different record >>> >>> Hope that helps a bit. >>> >>> Best, >>> >>> Dawid >>> >>> On 02/03/2020 09:04, Arvid Heise wrote: >>>> Hi Nitish, >>>> >>>> Kryo is the fallback serializer of Flink when everything else fails. In >>>> general, performance suffers quite a bit and it's not always applicable as >>>> in your case. Especially, in production code, it's best to avoid it >>>> completely. >>>> >>>> In your case, the issue is that your provided type information is >>>> completely meaningless. getProducedType is not providing any actual type >>>> information but just references to a generic skeleton. Flink uses the type >>>> information to reason about the value structures, which it cannot in your >>>> case. >>>> >>>> If you really need to resort to a completely generic serializer (which is >>>> usually not needed), then you have a few options: >>>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that >>>> generic you probably have only a simple transformation before outputting >>>> it into some generic Kafka sink. So your UDF deserializes, does some >>>> generic stuff, and immediately turns it back into byte[]. >>>> * Implement your own generic TypeInformation with serializer. >>>> WritableTypeInfo [1] is a generic example on how to do it. This will >>>> automatically convert byte[] back and forth to GenericRecord. That would >>>> be the recommended way when you have multiple transformations before >>>> source and sink. >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java >>>> >>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java> >>>> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com >>>> <mailto:nitishpant...@gmail.com>> wrote: >>>> Hi all, >>>> >>>> I am trying to work with flink to get avro data from kafka for which the >>>> schemas are stored in kafka schema registry. Since, the producer for kafka >>>> is a totally different service(an MQTT consumer sinked to kafka), I can’t >>>> have the schema with me at the consumer end. I read around and diverged to >>>> the following implementation of KeyedDeserializationSchema but I cannot >>>> understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: >>>> java.lang.NullPointerException` >>>> >>>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends >>>> KeyedDeserializationSchema[GenericRecord] { >>>> >>>> // Flink needs the serializer to be serializable => this "@transient >>>> lazy val" does the trick >>>> @transient lazy val valueDeserializer = { >>>> val deserializer = new KafkaAvroDeserializer(new >>>> CachedSchemaRegistryClient(schemaRegistryUrl, >>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)) >>>> deserializer.configure( >>>> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >>>> schemaRegistryUrl, >>>> KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> >>>> false).asJava, >>>> false) >>>> deserializer >>>> } >>>> >>>> override def isEndOfStream(nextElement: GenericRecord): Boolean = false >>>> >>>> override def deserialize(messageKey: Array[Byte], message: Array[Byte], >>>> topic: String, partition: Int, offset: Long): GenericRecord = { >>>> >>>> // val key = keyDeserializer.deserialize(topic, >>>> messageKey).asInstanceOf[String] >>>> val value = valueDeserializer.deserialize(topic, >>>> message).asInstanceOf[GenericRecord] >>>> >>>> value >>>> } >>>> >>>> override def getProducedType: TypeInformation[GenericRecord] = >>>> TypeExtractor.getForClass(classOf[GenericRecord]) >>>> } >>>> >>>> I have no clue how to go about solving this. I saw a lot of people trying >>>> to implement the same. If someone can guide me, it’d be really helpful. >>>> >>>> Thanks! >>>> Nitish >> >