Hi Arvid,

It’s actually the second case. I just wanted to build a scalable generic case 
where I can pass a set of kafka topics and my consumer can use the same 
AvroDeserializationSchema. But yeah, I think I’ll do the fetching latest schema 
part in main() separately.

Thanks for the help!

> On 02-Mar-2020, at 3:50 PM, Arvid Heise <ar...@ververica.com> wrote:
> 
> I didn't get the use case completely. Are you using several sensors with 
> different schemas? Are processing them jointly?
> 
> Let's assume some cases:
> 1) Only one format, it would be best to generate a case class with 
> avrohugger. That is especially true if you processing actually requires 
> specific fields to be present.
> 2) Several sensors, but processed independently. You could do the same as 1) 
> for all sensors. If you don't need to access specific fields, you should 
> fetch the latest schema in your main() and all the things that Flink provides.
> 3) You have constantly changing schemas and want to forward records always 
> with the latest schema enriched with some fields. You need to stick to 
> GenericRecord. I'd go with the byte[] approach of my first response if you 
> only have one such application / processing step.
> 4) Else go with the custom TypeInfo/Serializer. We can help you to implement 
> it. If you can do it yourself, I'd be awesome to put it as a response here 
> for other users.
> 
> On Mon, Mar 2, 2020 at 11:01 AM Nitish Pant <nitishpant...@gmail.com 
> <mailto:nitishpant...@gmail.com>> wrote:
> Hi,
> 
> So I am building a data pipeline that takes input from sensors via MQTT 
> broker and passes it to kafka. Before it goes to kafka, I am filtering and 
> serializing the filtered data into avro format and keeping the schema in the 
> registry. Now I want to get that data in flink to process it using some 
> algorithms. So, at the flinkKafkaConsumer end, I currently don’t have the 
> schemas for my data. One work around for me would be to get the schema 
> corresponding the data that I’ll be getting from a topic separately from the 
> registry and then work forward, but I was hoping there would a way to avoid 
> this and integrate the schema registry with my consumer in some way like 
> kafka-connect does. This is why I was trying this solution.
> 
> Do you think I should maybe do the work around method as implementing a 
> GenericRecord would be more of a overhead in the longer run?
> 
> Thanks!
> 
> 
>> On 02-Mar-2020, at 3:11 PM, Arvid Heise <ar...@ververica.com 
>> <mailto:ar...@ververica.com>> wrote:
>> 
>> Could you please give more background on your use case? It's hard to give 
>> any advice with the little information you gave us.
>> 
>> Usually, the consumer should know the schema or else it's hard to do 
>> meaningful processing.
>> If it's something completely generic, then there is no way around it, but 
>> that should be the last straw. Here my recommendations from my first 
>> response would come into play.
>> 
>> If they are not working for you for some reason, please let me know why and 
>> I could come up with a solution.
>> 
>> On Mon, Mar 2, 2020 at 10:27 AM Nitish Pant <nitishpant...@gmail.com 
>> <mailto:nitishpant...@gmail.com>> wrote:
>> Hi,
>> 
>> Thanks for the replies. I get that it is not wise to use GenericRecord and 
>> that is what is causing the Kryo fallback, but then if not this, how should 
>> I go about writing a AvroSchemaRegistrySchema for when I don’t know the 
>> schema. Without the knowledge of schema, I can’t create a class. Can you 
>> suggest a way of getting around that?
>> 
>> Thanks!
>> 
>>> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakow...@apache.org 
>>> <mailto:dwysakow...@apache.org>> wrote:
>>> 
>>> Hi Nitish,
>>> 
>>> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer 
>>> comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). 
>>> As a GenericRecord is not a pojo this call will produce a GenericTypeInfo 
>>> which uses Kryo serialization.
>>> 
>>> For a reference example I would recommend having a look at 
>>> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for 
>>> working with GenericRecords. One important note. GenericRecords are not the 
>>> best candidates for a data objects in Flink. The reason is if you apply any 
>>> transformation on a GenericRecord e.g. map/flatMap. The input type 
>>> information cannot be forwarded as the transformation is a black box from 
>>> Flink's perspective. Therefore you would need to provide the type 
>>> information for every step of the pipeline:
>>> 
>>> TypeInformation<?> info = ...
>>> 
>>> sEnv.addSource(...) // produces info
>>> 
>>> .map(...)
>>> 
>>> .returns(info) // must be provided again, as the map transformation is a 
>>> black box, the transformation might produce a completely different record
>>> 
>>> Hope that helps a bit.
>>> 
>>> Best,
>>> 
>>> Dawid
>>> 
>>> On 02/03/2020 09:04, Arvid Heise wrote:
>>>> Hi Nitish,
>>>> 
>>>> Kryo is the fallback serializer of Flink when everything else fails. In 
>>>> general, performance suffers quite a bit and it's not always applicable as 
>>>> in your case. Especially, in production code, it's best to avoid it 
>>>> completely.
>>>> 
>>>> In your case, the issue is that your provided type information is 
>>>> completely meaningless. getProducedType is not providing any actual type 
>>>> information but just references to a generic skeleton. Flink uses the type 
>>>> information to reason about the value structures, which it cannot in your 
>>>> case.
>>>> 
>>>> If you really need to resort to a completely generic serializer (which is 
>>>> usually not needed), then you have a few options:
>>>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that 
>>>> generic you probably have only a simple transformation before outputting 
>>>> it into some generic Kafka sink. So your UDF deserializes, does some 
>>>> generic stuff, and immediately turns it back into byte[].
>>>> * Implement your own generic TypeInformation with serializer. 
>>>> WritableTypeInfo [1] is a generic example on how to do it. This will 
>>>> automatically convert byte[] back and forth to GenericRecord. That would 
>>>> be the recommended way when you have multiple transformations before 
>>>> source and sink.
>>>> 
>>>> [1] 
>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>>>>  
>>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java>
>>>> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com 
>>>> <mailto:nitishpant...@gmail.com>> wrote:
>>>> Hi all,
>>>> 
>>>> I am trying to work with flink to get avro data from kafka for which the 
>>>> schemas are stored in kafka schema registry. Since, the producer for kafka 
>>>> is a totally different service(an MQTT consumer sinked to kafka), I can’t 
>>>> have the schema with me at the consumer end. I read around and diverged to 
>>>> the following implementation of KeyedDeserializationSchema but I cannot 
>>>> understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: 
>>>> java.lang.NullPointerException`
>>>> 
>>>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends 
>>>> KeyedDeserializationSchema[GenericRecord] {
>>>> 
>>>>   // Flink needs the serializer to be serializable => this "@transient 
>>>> lazy val" does the trick
>>>>   @transient lazy val valueDeserializer = {
>>>>         val deserializer = new KafkaAvroDeserializer(new 
>>>> CachedSchemaRegistryClient(schemaRegistryUrl, 
>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
>>>>         deserializer.configure(
>>>>           Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
>>>> schemaRegistryUrl,
>>>>         KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> 
>>>> false).asJava,
>>>>           false)
>>>>         deserializer
>>>>   }
>>>> 
>>>>   override def isEndOfStream(nextElement: GenericRecord): Boolean = false
>>>> 
>>>>   override def deserialize(messageKey: Array[Byte], message: Array[Byte],
>>>>         topic: String, partition: Int, offset: Long): GenericRecord = {
>>>> 
>>>>          // val key = keyDeserializer.deserialize(topic, 
>>>> messageKey).asInstanceOf[String]
>>>>           val value = valueDeserializer.deserialize(topic, 
>>>> message).asInstanceOf[GenericRecord]
>>>> 
>>>>       value
>>>>   }
>>>> 
>>>>   override def getProducedType: TypeInformation[GenericRecord] =
>>>>     TypeExtractor.getForClass(classOf[GenericRecord])
>>>> }
>>>> 
>>>> I have no clue how to go about solving this. I saw a lot of people trying 
>>>> to implement the same. If someone can guide me, it’d be really helpful.
>>>> 
>>>> Thanks!
>>>> Nitish
>> 
> 

Reply via email to