Hi Max Thanks for the example.
Based on your example here is what i did: public class Streamingkafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(500); env.setParallelism(1); ParameterTool parameterTool = ParameterTool.fromArgs(args); Properties props = new Properties(); props.put("schema.registry.url", "http://localhost:8081"); props.put("specific.avro.reader", true); VerifiableProperties vProps = new VerifiableProperties(props); DeserializationSchema<String> decoder = new MyAvroDeserializer(vProps); env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder, parameterTool.getProperties())).print(); env.execute(); } public class MyAvroDeserializer implements DeserializationSchema<String> { private KafkaAvroDecoder decoder; public MyAvroDeserializer(VerifiableProperties vProps) { this.decoder = new KafkaAvroDecoder(vProps); } @Override public String deserialize(byte[] message) throws IOException { return (String) this.decoder.fromBytes(message); } @Override public boolean isEndOfStream(String nextElement) { return false; } @Override public TypeInformation<String> getProducedType() { return null; } } Here is the error i am seeing... Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) at test.flink.Streamingkafka.main(Streamingkafka.java:25) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: java.io.NotSerializableException: io.confluent.kafka.serializers.KafkaAvroDecoder at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) ... 11 more Am i doing some thing wrong in my code? On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <m...@apache.org> wrote: > Hi Madhukar, > > Thanks for your question. When you instantiate the FlinkKafkaConsumer, you > supply a DeserializationSchema in the constructor. You simply create a > class which implements DeserializationSchema and contains the > KafkaAvroDecoder with the schema registry. > > Like so: > > public class MyAvroDeserializer implements DeserializationSchema<MyType> { > > private KafkaAvroDecoder decoder; > > public MyAvroDeserializer() { > SchemaRegistryClient schemaRegistry = new > SchemaRegistryClient(...); > this.decoder = new KafkaAvroDecoder(schemaRegistry); > } > > public MyType deserialize(byte[] message) throws Exception { > return (MyType) this.decoder.fromBytes(messages); > } > > public boolean isEndOfStream(MyType nextElement) { > return false; > } > > } > > Then you supply this class when creating the consumer: > > DeserializationSchema<MyType> decoder = new MyAvroDeserializer() > Properties props = new Properties(); > OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA; > FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL; > > FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, > props, offsetStore, fetcherType); > > > Let me know if that works for you. > > Best regards, > Max > > On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <madhukar.th...@gmail.com> > wrote: > >> Hi >> >> I am very new to Avro. Currently I am using confluent Kafka version and I >> am able to write an Avro message to Kafka by storing schema in schema >> registry. Now I need to consume those messages using Flink Kafka Consumer >> and having a hard time to deserialize the messages. >> >> I am looking for an example on how to deserialize Avro message where >> schema is stored in schema registry. >> >> Any help is appreciated. Thanks in Advance. >> >> Thanks, >> Madhu >> >> >