Hi Max

Thanks for the example.

Based on your example here is what i did:

public class Streamingkafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500);
        env.setParallelism(1);

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        Properties props = new Properties();
        props.put("schema.registry.url", "http://localhost:8081";);
        props.put("specific.avro.reader", true);
        VerifiableProperties vProps = new VerifiableProperties(props);

        DeserializationSchema<String> decoder = new MyAvroDeserializer(vProps);
        env.addSource(new
FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
                parameterTool.getProperties())).print();

        env.execute();
    }

public class MyAvroDeserializer implements DeserializationSchema<String> {
    private KafkaAvroDecoder decoder;

    public MyAvroDeserializer(VerifiableProperties vProps) {
        this.decoder = new KafkaAvroDecoder(vProps);
    }

    @Override
    public String deserialize(byte[] message) throws IOException {
        return (String) this.decoder.fromBytes(message);
    }

    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return null;
    }
}


Here is the error i am seeing...


Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e
not serializable
        at 
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
        at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
        at test.flink.Streamingkafka.main(Streamingkafka.java:25)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException:
io.confluent.kafka.serializers.KafkaAvroDecoder
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
        at 
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
        ... 11 more



Am i doing some thing wrong in my code?



On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Madhukar,
>
> Thanks for your question. When you instantiate the FlinkKafkaConsumer, you
> supply a DeserializationSchema in the constructor. You simply create a
> class which implements DeserializationSchema and contains the
> KafkaAvroDecoder with the schema registry.
>
> Like so:
>
> public class MyAvroDeserializer implements DeserializationSchema<MyType> {
>
>     private KafkaAvroDecoder decoder;
>
>     public MyAvroDeserializer() {
>          SchemaRegistryClient schemaRegistry = new
> SchemaRegistryClient(...);
>          this.decoder = new KafkaAvroDecoder(schemaRegistry);
>     }
>
>     public MyType deserialize(byte[] message) throws Exception {
>          return (MyType) this.decoder.fromBytes(messages);
>     }
>
>     public boolean isEndOfStream(MyType nextElement) {
>          return false;
>     }
>
> }
>
> Then you supply this class when creating the consumer:
>
> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
> Properties props = new Properties();
> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
> FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
>
> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder,
> props, offsetStore, fetcherType);
>
>
> Let me know if that works for you.
>
> Best regards,
> Max
>
> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <madhukar.th...@gmail.com>
> wrote:
>
>> Hi
>>
>> I am very new to Avro. Currently I am using confluent Kafka version and I
>> am able to write an Avro message to Kafka by storing schema in schema
>> registry. Now I need to consume those messages using Flink Kafka Consumer
>> and having a hard time to deserialize the messages.
>>
>> I am looking for an example on how to deserialize Avro message where
>> schema is stored in schema registry.
>>
>> Any help is appreciated. Thanks in Advance.
>>
>> Thanks,
>> Madhu
>>
>>
>

Reply via email to