Hi, So I am building a data pipeline that takes input from sensors via MQTT broker and passes it to kafka. Before it goes to kafka, I am filtering and serializing the filtered data into avro format and keeping the schema in the registry. Now I want to get that data in flink to process it using some algorithms. So, at the flinkKafkaConsumer end, I currently don’t have the schemas for my data. One work around for me would be to get the schema corresponding the data that I’ll be getting from a topic separately from the registry and then work forward, but I was hoping there would a way to avoid this and integrate the schema registry with my consumer in some way like kafka-connect does. This is why I was trying this solution.
Do you think I should maybe do the work around method as implementing a GenericRecord would be more of a overhead in the longer run? Thanks! > On 02-Mar-2020, at 3:11 PM, Arvid Heise <ar...@ververica.com> wrote: > > Could you please give more background on your use case? It's hard to give any > advice with the little information you gave us. > > Usually, the consumer should know the schema or else it's hard to do > meaningful processing. > If it's something completely generic, then there is no way around it, but > that should be the last straw. Here my recommendations from my first response > would come into play. > > If they are not working for you for some reason, please let me know why and I > could come up with a solution. > > On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant <nitishpant...@gmail.com > <mailto:nitishpant...@gmail.com>> wrote: > Hi, > > Thanks for the replies. I get that it is not wise to use GenericRecord and > that is what is causing the Kryo fallback, but then if not this, how should I > go about writing a AvroSchemaRegistrySchema for when I don’t know the schema. > Without the knowledge of schema, I can’t create a class. Can you suggest a > way of getting around that? > > Thanks! > >> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakow...@apache.org >> <mailto:dwysakow...@apache.org>> wrote: >> >> Hi Nitish, >> >> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer >> comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As >> a GenericRecord is not a pojo this call will produce a GenericTypeInfo which >> uses Kryo serialization. >> >> For a reference example I would recommend having a look at >> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for >> working with GenericRecords. One important note. GenericRecords are not the >> best candidates for a data objects in Flink. The reason is if you apply any >> transformation on a GenericRecord e.g. map/flatMap. The input type >> information cannot be forwarded as the transformation is a black box from >> Flink's perspective. Therefore you would need to provide the type >> information for every step of the pipeline: >> >> TypeInformation<?> info = ... >> >> sEnv.addSource(...) // produces info >> >> .map(...) >> >> .returns(info) // must be provided again, as the map transformation is a >> black box, the transformation might produce a completely different record >> >> Hope that helps a bit. >> >> Best, >> >> Dawid >> >> On 02/03/2020 09:04, Arvid Heise wrote: >>> Hi Nitish, >>> >>> Kryo is the fallback serializer of Flink when everything else fails. In >>> general, performance suffers quite a bit and it's not always applicable as >>> in your case. Especially, in production code, it's best to avoid it >>> completely. >>> >>> In your case, the issue is that your provided type information is >>> completely meaningless. getProducedType is not providing any actual type >>> information but just references to a generic skeleton. Flink uses the type >>> information to reason about the value structures, which it cannot in your >>> case. >>> >>> If you really need to resort to a completely generic serializer (which is >>> usually not needed), then you have a few options: >>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that >>> generic you probably have only a simple transformation before outputting it >>> into some generic Kafka sink. So your UDF deserializes, does some generic >>> stuff, and immediately turns it back into byte[]. >>> * Implement your own generic TypeInformation with serializer. >>> WritableTypeInfo [1] is a generic example on how to do it. This will >>> automatically convert byte[] back and forth to GenericRecord. That would be >>> the recommended way when you have multiple transformations before source >>> and sink. >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java >>> >>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java> >>> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com >>> <mailto:nitishpant...@gmail.com>> wrote: >>> Hi all, >>> >>> I am trying to work with flink to get avro data from kafka for which the >>> schemas are stored in kafka schema registry. Since, the producer for kafka >>> is a totally different service(an MQTT consumer sinked to kafka), I can’t >>> have the schema with me at the consumer end. I read around and diverged to >>> the following implementation of KeyedDeserializationSchema but I cannot >>> understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: >>> java.lang.NullPointerException` >>> >>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends >>> KeyedDeserializationSchema[GenericRecord] { >>> >>> // Flink needs the serializer to be serializable => this "@transient lazy >>> val" does the trick >>> @transient lazy val valueDeserializer = { >>> val deserializer = new KafkaAvroDeserializer(new >>> CachedSchemaRegistryClient(schemaRegistryUrl, >>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)) >>> deserializer.configure( >>> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >>> schemaRegistryUrl, >>> KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> >>> false).asJava, >>> false) >>> deserializer >>> } >>> >>> override def isEndOfStream(nextElement: GenericRecord): Boolean = false >>> >>> override def deserialize(messageKey: Array[Byte], message: Array[Byte], >>> topic: String, partition: Int, offset: Long): GenericRecord = { >>> >>> // val key = keyDeserializer.deserialize(topic, >>> messageKey).asInstanceOf[String] >>> val value = valueDeserializer.deserialize(topic, >>> message).asInstanceOf[GenericRecord] >>> >>> value >>> } >>> >>> override def getProducedType: TypeInformation[GenericRecord] = >>> TypeExtractor.getForClass(classOf[GenericRecord]) >>> } >>> >>> I have no clue how to go about solving this. I saw a lot of people trying >>> to implement the same. If someone can guide me, it’d be really helpful. >>> >>> Thanks! >>> Nitish >