Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Maximilian Michels
You need to initialize the decoder in the deserialize method instead of in the constructor. On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota wrote: > Hi Max/Ewen, > > Thank you for the inputs. I was able to solve the serialization issues. Now > i am seeing the NullPoint Exceptions. > > public clas

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Till Rohrmann
The constructor of Java classes after deserialization is not necessarily called. Thus, you should move the check if (this.decoder == null) { this.decoder = new KafkaAvroDecoder(vProps); } into the deserialize method of MyAvroDeserializer. Cheers, Till ​ On Thu, Nov 19, 2015 at 1:50 PM, Madh

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Madhukar Thota
Hi Max/Ewen, Thank you for the inputs. I was able to solve the serialization issues. Now i am seeing the NullPoint Exceptions. public class MyAvroDeserializer implements DeserializationSchema { private transient KafkaAvroDecoder decoder; public MyAvroDeserializer(VerifiableProperties vP

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Maximilian Michels
Stephan is right, this should do it in deserialize(): if (decoder == null) { decoder = new KafkaAvroDecoder(vProps); } Further, you might have to specify the correct return type for getProducedType(). You may use public TypeInformation getProducedType() { return TypeE

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Stephan Ewen
The KafkaAvroDecoder is not serializable, and Flink uses serialization to distribute the code to the TaskManagers in the cluster. I think you need to "lazily" initialize the decoder, in the first invocation of "deserialize()". That should do it. Stephan On Thu, Nov 19, 2015 at 12:10 PM, Madhuka

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Madhukar Thota
Hi Max Thanks for the example. Based on your example here is what i did: public class Streamingkafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(500)

Re: Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Maximilian Michels
Hi Madhukar, Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry. Like so: public class MyAvroDese

Kafka + confluent schema registry Avro parsing

2015-11-19 Thread Madhukar Thota
Hi I am very new to Avro. Currently I am using confluent Kafka version and I am able to write an Avro message to Kafka by storing schema in schema registry. Now I need to consume those messages using Flink Kafka Consumer and having a hard time to deserialize the messages. I am looking for an exam