Hi,

So I am building a data pipeline that takes input from sensors via MQTT broker 
and passes it to kafka. Before it goes to kafka, I am filtering and serializing 
the filtered data into avro format and keeping the schema in the registry. Now 
I want to get that data in flink to process it using some algorithms. So, at 
the flinkKafkaConsumer end, I currently don’t have the schemas for my data. One 
work around for me would be to get the schema corresponding the data that I’ll 
be getting from a topic separately from the registry and then work forward, but 
I was hoping there would a way to avoid this and integrate the schema registry 
with my consumer in some way like kafka-connect does. This is why I was trying 
this solution.

Do you think I should maybe do the work around method as implementing a 
GenericRecord would be more of a overhead in the longer run?

Thanks!


> On 02-Mar-2020, at 3:11 PM, Arvid Heise <ar...@ververica.com> wrote:
> 
> Could you please give more background on your use case? It's hard to give any 
> advice with the little information you gave us.
> 
> Usually, the consumer should know the schema or else it's hard to do 
> meaningful processing.
> If it's something completely generic, then there is no way around it, but 
> that should be the last straw. Here my recommendations from my first response 
> would come into play.
> 
> If they are not working for you for some reason, please let me know why and I 
> could come up with a solution.
> 
> On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant <nitishpant...@gmail.com 
> <mailto:nitishpant...@gmail.com>> wrote:
> Hi,
> 
> Thanks for the replies. I get that it is not wise to use GenericRecord and 
> that is what is causing the Kryo fallback, but then if not this, how should I 
> go about writing a AvroSchemaRegistrySchema for when I don’t know the schema. 
> Without the knowledge of schema, I can’t create a class. Can you suggest a 
> way of getting around that?
> 
> Thanks!
> 
>> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakow...@apache.org 
>> <mailto:dwysakow...@apache.org>> wrote:
>> 
>> Hi Nitish,
>> 
>> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer 
>> comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As 
>> a GenericRecord is not a pojo this call will produce a GenericTypeInfo which 
>> uses Kryo serialization.
>> 
>> For a reference example I would recommend having a look at 
>> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for 
>> working with GenericRecords. One important note. GenericRecords are not the 
>> best candidates for a data objects in Flink. The reason is if you apply any 
>> transformation on a GenericRecord e.g. map/flatMap. The input type 
>> information cannot be forwarded as the transformation is a black box from 
>> Flink's perspective. Therefore you would need to provide the type 
>> information for every step of the pipeline:
>> 
>> TypeInformation<?> info = ...
>> 
>> sEnv.addSource(...) // produces info
>> 
>> .map(...)
>> 
>> .returns(info) // must be provided again, as the map transformation is a 
>> black box, the transformation might produce a completely different record
>> 
>> Hope that helps a bit.
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 02/03/2020 09:04, Arvid Heise wrote:
>>> Hi Nitish,
>>> 
>>> Kryo is the fallback serializer of Flink when everything else fails. In 
>>> general, performance suffers quite a bit and it's not always applicable as 
>>> in your case. Especially, in production code, it's best to avoid it 
>>> completely.
>>> 
>>> In your case, the issue is that your provided type information is 
>>> completely meaningless. getProducedType is not providing any actual type 
>>> information but just references to a generic skeleton. Flink uses the type 
>>> information to reason about the value structures, which it cannot in your 
>>> case.
>>> 
>>> If you really need to resort to a completely generic serializer (which is 
>>> usually not needed), then you have a few options:
>>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that 
>>> generic you probably have only a simple transformation before outputting it 
>>> into some generic Kafka sink. So your UDF deserializes, does some generic 
>>> stuff, and immediately turns it back into byte[].
>>> * Implement your own generic TypeInformation with serializer. 
>>> WritableTypeInfo [1] is a generic example on how to do it. This will 
>>> automatically convert byte[] back and forth to GenericRecord. That would be 
>>> the recommended way when you have multiple transformations before source 
>>> and sink.
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>>>  
>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java>
>>> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com 
>>> <mailto:nitishpant...@gmail.com>> wrote:
>>> Hi all,
>>> 
>>> I am trying to work with flink to get avro data from kafka for which the 
>>> schemas are stored in kafka schema registry. Since, the producer for kafka 
>>> is a totally different service(an MQTT consumer sinked to kafka), I can’t 
>>> have the schema with me at the consumer end. I read around and diverged to 
>>> the following implementation of KeyedDeserializationSchema but I cannot 
>>> understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: 
>>> java.lang.NullPointerException`
>>> 
>>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends 
>>> KeyedDeserializationSchema[GenericRecord] {
>>> 
>>>   // Flink needs the serializer to be serializable => this "@transient lazy 
>>> val" does the trick
>>>   @transient lazy val valueDeserializer = {
>>>         val deserializer = new KafkaAvroDeserializer(new 
>>> CachedSchemaRegistryClient(schemaRegistryUrl, 
>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
>>>         deserializer.configure(
>>>           Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
>>> schemaRegistryUrl,
>>>         KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> 
>>> false).asJava,
>>>           false)
>>>         deserializer
>>>   }
>>> 
>>>   override def isEndOfStream(nextElement: GenericRecord): Boolean = false
>>> 
>>>   override def deserialize(messageKey: Array[Byte], message: Array[Byte],
>>>         topic: String, partition: Int, offset: Long): GenericRecord = {
>>> 
>>>          // val key = keyDeserializer.deserialize(topic, 
>>> messageKey).asInstanceOf[String]
>>>           val value = valueDeserializer.deserialize(topic, 
>>> message).asInstanceOf[GenericRecord]
>>> 
>>>       value
>>>   }
>>> 
>>>   override def getProducedType: TypeInformation[GenericRecord] =
>>>     TypeExtractor.getForClass(classOf[GenericRecord])
>>> }
>>> 
>>> I have no clue how to go about solving this. I saw a lot of people trying 
>>> to implement the same. If someone can guide me, it’d be really helpful.
>>> 
>>> Thanks!
>>> Nitish
> 

Reply via email to