The KafkaAvroDecoder is not serializable, and Flink uses serialization to distribute the code to the TaskManagers in the cluster.
I think you need to "lazily" initialize the decoder, in the first invocation of "deserialize()". That should do it. Stephan On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <madhukar.th...@gmail.com> wrote: > Hi Max > > Thanks for the example. > > Based on your example here is what i did: > > public class Streamingkafka { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(500); > env.setParallelism(1); > > ParameterTool parameterTool = ParameterTool.fromArgs(args); > Properties props = new Properties(); > props.put("schema.registry.url", "http://localhost:8081"); > props.put("specific.avro.reader", true); > VerifiableProperties vProps = new VerifiableProperties(props); > > DeserializationSchema<String> decoder = new > MyAvroDeserializer(vProps); > env.addSource(new > FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder, > parameterTool.getProperties())).print(); > > env.execute(); > } > > public class MyAvroDeserializer implements DeserializationSchema<String> { > private KafkaAvroDecoder decoder; > > public MyAvroDeserializer(VerifiableProperties vProps) { > this.decoder = new KafkaAvroDecoder(vProps); > } > > @Override > public String deserialize(byte[] message) throws IOException { > return (String) this.decoder.fromBytes(message); > } > > @Override > public boolean isEndOfStream(String nextElement) { > return false; > } > > @Override > public TypeInformation<String> getProducedType() { > return null; > } > } > > > Here is the error i am seeing... > > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: Object > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e > not serializable > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) > at test.flink.Streamingkafka.main(Streamingkafka.java:25) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.io.NotSerializableException: > io.confluent.kafka.serializers.KafkaAvroDecoder > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) > ... 11 more > > > > Am i doing some thing wrong in my code? > > > > On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <m...@apache.org> > wrote: > >> Hi Madhukar, >> >> Thanks for your question. When you instantiate the FlinkKafkaConsumer, >> you supply a DeserializationSchema in the constructor. You simply create a >> class which implements DeserializationSchema and contains the >> KafkaAvroDecoder with the schema registry. >> >> Like so: >> >> public class MyAvroDeserializer implements DeserializationSchema<MyType> { >> >> private KafkaAvroDecoder decoder; >> >> public MyAvroDeserializer() { >> SchemaRegistryClient schemaRegistry = new >> SchemaRegistryClient(...); >> this.decoder = new KafkaAvroDecoder(schemaRegistry); >> } >> >> public MyType deserialize(byte[] message) throws Exception { >> return (MyType) this.decoder.fromBytes(messages); >> } >> >> public boolean isEndOfStream(MyType nextElement) { >> return false; >> } >> >> } >> >> Then you supply this class when creating the consumer: >> >> DeserializationSchema<MyType> decoder = new MyAvroDeserializer() >> Properties props = new Properties(); >> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA; >> FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL; >> >> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, >> props, offsetStore, fetcherType); >> >> >> Let me know if that works for you. >> >> Best regards, >> Max >> >> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <madhukar.th...@gmail.com >> > wrote: >> >>> Hi >>> >>> I am very new to Avro. Currently I am using confluent Kafka version and >>> I am able to write an Avro message to Kafka by storing schema in schema >>> registry. Now I need to consume those messages using Flink Kafka Consumer >>> and having a hard time to deserialize the messages. >>> >>> I am looking for an example on how to deserialize Avro message where >>> schema is stored in schema registry. >>> >>> Any help is appreciated. Thanks in Advance. >>> >>> Thanks, >>> Madhu >>> >>> >> >