The KafkaAvroDecoder is not serializable, and Flink uses serialization to
distribute the code to the TaskManagers in the cluster.

I think you need to "lazily" initialize the decoder, in the first
invocation of "deserialize()". That should do it.

Stephan


On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <madhukar.th...@gmail.com>
wrote:

> Hi Max
>
> Thanks for the example.
>
> Based on your example here is what i did:
>
> public class Streamingkafka {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.enableCheckpointing(500);
>         env.setParallelism(1);
>
>         ParameterTool parameterTool = ParameterTool.fromArgs(args);
>         Properties props = new Properties();
>         props.put("schema.registry.url", "http://localhost:8081";);
>         props.put("specific.avro.reader", true);
>         VerifiableProperties vProps = new VerifiableProperties(props);
>
>         DeserializationSchema<String> decoder = new 
> MyAvroDeserializer(vProps);
>         env.addSource(new 
> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
>                 parameterTool.getProperties())).print();
>
>         env.execute();
>     }
>
> public class MyAvroDeserializer implements DeserializationSchema<String> {
>     private KafkaAvroDecoder decoder;
>
>     public MyAvroDeserializer(VerifiableProperties vProps) {
>         this.decoder = new KafkaAvroDecoder(vProps);
>     }
>
>     @Override
>     public String deserialize(byte[] message) throws IOException {
>         return (String) this.decoder.fromBytes(message);
>     }
>
>     @Override
>     public boolean isEndOfStream(String nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<String> getProducedType() {
>         return null;
>     }
> }
>
>
> Here is the error i am seeing...
>
>
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: Object 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e 
> not serializable
>       at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>       at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
>       at test.flink.Streamingkafka.main(Streamingkafka.java:25)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.io.NotSerializableException: 
> io.confluent.kafka.serializers.KafkaAvroDecoder
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>       at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>       at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>       at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>       at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>       at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>       at 
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>       ... 11 more
>
>
>
> Am i doing some thing wrong in my code?
>
>
>
> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Hi Madhukar,
>>
>> Thanks for your question. When you instantiate the FlinkKafkaConsumer,
>> you supply a DeserializationSchema in the constructor. You simply create a
>> class which implements DeserializationSchema and contains the
>> KafkaAvroDecoder with the schema registry.
>>
>> Like so:
>>
>> public class MyAvroDeserializer implements DeserializationSchema<MyType> {
>>
>>     private KafkaAvroDecoder decoder;
>>
>>     public MyAvroDeserializer() {
>>          SchemaRegistryClient schemaRegistry = new
>> SchemaRegistryClient(...);
>>          this.decoder = new KafkaAvroDecoder(schemaRegistry);
>>     }
>>
>>     public MyType deserialize(byte[] message) throws Exception {
>>          return (MyType) this.decoder.fromBytes(messages);
>>     }
>>
>>     public boolean isEndOfStream(MyType nextElement) {
>>          return false;
>>     }
>>
>> }
>>
>> Then you supply this class when creating the consumer:
>>
>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
>> Properties props = new Properties();
>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
>> FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
>>
>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder,
>> props, offsetStore, fetcherType);
>>
>>
>> Let me know if that works for you.
>>
>> Best regards,
>> Max
>>
>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <madhukar.th...@gmail.com
>> > wrote:
>>
>>> Hi
>>>
>>> I am very new to Avro. Currently I am using confluent Kafka version and
>>> I am able to write an Avro message to Kafka by storing schema in schema
>>> registry. Now I need to consume those messages using Flink Kafka Consumer
>>> and having a hard time to deserialize the messages.
>>>
>>> I am looking for an example on how to deserialize Avro message where
>>> schema is stored in schema registry.
>>>
>>> Any help is appreciated. Thanks in Advance.
>>>
>>> Thanks,
>>> Madhu
>>>
>>>
>>
>

Reply via email to