Stephan is right, this should do it in deserialize(): if (decoder == null) { decoder = new KafkaAvroDecoder(vProps); }
Further, you might have to specify the correct return type for getProducedType(). You may use public TypeInformation<String> getProducedType() { return TypeExtractor.getForClass(String.class); } Cheers, Max On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <se...@apache.org> wrote: > The KafkaAvroDecoder is not serializable, and Flink uses serialization to > distribute the code to the TaskManagers in the cluster. > > I think you need to "lazily" initialize the decoder, in the first invocation > of "deserialize()". That should do it. > > Stephan > > > On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <madhukar.th...@gmail.com> > wrote: >> >> Hi Max >> >> Thanks for the example. >> >> Based on your example here is what i did: >> >> public class Streamingkafka { >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.enableCheckpointing(500); >> env.setParallelism(1); >> >> ParameterTool parameterTool = ParameterTool.fromArgs(args); >> Properties props = new Properties(); >> props.put("schema.registry.url", "http://localhost:8081"); >> props.put("specific.avro.reader", true); >> VerifiableProperties vProps = new VerifiableProperties(props); >> >> DeserializationSchema<String> decoder = new >> MyAvroDeserializer(vProps); >> env.addSource(new >> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder, >> parameterTool.getProperties())).print(); >> >> env.execute(); >> } >> >> public class MyAvroDeserializer implements DeserializationSchema<String> { >> private KafkaAvroDecoder decoder; >> >> public MyAvroDeserializer(VerifiableProperties vProps) { >> this.decoder = new KafkaAvroDecoder(vProps); >> } >> >> @Override >> public String deserialize(byte[] message) throws IOException { >> return (String) this.decoder.fromBytes(message); >> } >> >> @Override >> public boolean isEndOfStream(String nextElement) { >> return false; >> } >> >> @Override >> public TypeInformation<String> getProducedType() { >> return null; >> } >> } >> >> >> Here is the error i am seeing... >> >> >> Exception in thread "main" >> org.apache.flink.api.common.InvalidProgramException: Object >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e >> not serializable >> at >> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) >> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089) >> at test.flink.Streamingkafka.main(Streamingkafka.java:25) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) >> Caused by: java.io.NotSerializableException: >> io.confluent.kafka.serializers.KafkaAvroDecoder >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307) >> at >> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95) >> ... 11 more >> >> >> >> Am i doing some thing wrong in my code? >> >> >> >> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <m...@apache.org> >> wrote: >>> >>> Hi Madhukar, >>> >>> Thanks for your question. When you instantiate the FlinkKafkaConsumer, >>> you supply a DeserializationSchema in the constructor. You simply create a >>> class which implements DeserializationSchema and contains the >>> KafkaAvroDecoder with the schema registry. >>> >>> Like so: >>> >>> public class MyAvroDeserializer implements DeserializationSchema<MyType> >>> { >>> >>> private KafkaAvroDecoder decoder; >>> >>> public MyAvroDeserializer() { >>> SchemaRegistryClient schemaRegistry = new >>> SchemaRegistryClient(...); >>> this.decoder = new KafkaAvroDecoder(schemaRegistry); >>> } >>> >>> public MyType deserialize(byte[] message) throws Exception { >>> return (MyType) this.decoder.fromBytes(messages); >>> } >>> >>> public boolean isEndOfStream(MyType nextElement) { >>> return false; >>> } >>> >>> } >>> >>> Then you supply this class when creating the consumer: >>> >>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer() >>> Properties props = new Properties(); >>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA; >>> FetcherType fetcherType = >>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL; >>> >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, >>> props, offsetStore, fetcherType); >>> >>> >>> Let me know if that works for you. >>> >>> Best regards, >>> Max >>> >>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota >>> <madhukar.th...@gmail.com> wrote: >>>> >>>> Hi >>>> >>>> I am very new to Avro. Currently I am using confluent Kafka version and >>>> I am able to write an Avro message to Kafka by storing schema in schema >>>> registry. Now I need to consume those messages using Flink Kafka Consumer >>>> and having a hard time to deserialize the messages. >>>> >>>> I am looking for an example on how to deserialize Avro message where >>>> schema is stored in schema registry. >>>> >>>> Any help is appreciated. Thanks in Advance. >>>> >>>> Thanks, >>>> Madhu >>>> >>> >> >