The constructor of Java classes after deserialization is not necessarily
called. Thus, you should move the check

if (this.decoder == null) {
    this.decoder = new KafkaAvroDecoder(vProps);
}

into the deserialize method of MyAvroDeserializer.

Cheers,
Till
​

On Thu, Nov 19, 2015 at 1:50 PM, Madhukar Thota <madhukar.th...@gmail.com>
wrote:

> Hi Max/Ewen,
>
> Thank you for the inputs. I was able to solve the serialization issues.
> Now i am seeing the NullPoint Exceptions.
>
> public class MyAvroDeserializer implements DeserializationSchema<String> {
>
>     private transient KafkaAvroDecoder decoder;
>
>     public MyAvroDeserializer(VerifiableProperties vProps) {
>         if (this.decoder == null) {
>             this.decoder = new KafkaAvroDecoder(vProps);
>         }
>     }
>
>     @Override
>     public String deserialize(byte[] message) throws IOException {
>         return (String) decoder.fromBytes(message);
>     }
>
>     @Override
>     public boolean isEndOfStream(String nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<String> getProducedType() {
>         return TypeExtractor.getForClass(String.class);
>     }
>
> }
>
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(1/4) switched to 
> RUNNING
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(1/4) switched to 
> FAILED
> java.lang.Exception
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>       at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
>       at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)
>
> 11/19/2015 07:47:39   Job execution switched to status FAILING.
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(2/4) switched to 
> CANCELING
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(3/4) switched to 
> CANCELING
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(4/4) switched to 
> CANCELING
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(2/4) switched to 
> CANCELED
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(4/4) switched to 
> CANCELED
> 11/19/2015 07:47:39   Source: Custom Source -> Sink: Unnamed(3/4) switched to 
> CANCELED
> 11/19/2015 07:47:39   Job execution switched to status FAILED.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>       at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
>       at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)
>
>
>
>
> On Thu, Nov 19, 2015 at 6:30 AM, Maximilian Michels <m...@apache.org>
> wrote:
>
>> Stephan is right, this should do it in deserialize():
>>
>>     if (decoder == null) {
>>         decoder = new KafkaAvroDecoder(vProps);
>>     }
>>
>> Further, you might have to specify the correct return type for
>> getProducedType(). You may use
>>
>>     public TypeInformation<String> getProducedType() {
>>         return TypeExtractor.getForClass(String.class);
>>     }
>>
>> Cheers,
>> Max
>>
>> On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <se...@apache.org> wrote:
>> > The KafkaAvroDecoder is not serializable, and Flink uses serialization
>> to
>> > distribute the code to the TaskManagers in the cluster.
>> >
>> > I think you need to "lazily" initialize the decoder, in the first
>> invocation
>> > of "deserialize()". That should do it.
>> >
>> > Stephan
>> >
>> >
>> > On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <
>> madhukar.th...@gmail.com>
>> > wrote:
>> >>
>> >> Hi Max
>> >>
>> >> Thanks for the example.
>> >>
>> >> Based on your example here is what i did:
>> >>
>> >> public class Streamingkafka {
>> >>     public static void main(String[] args) throws Exception {
>> >>         StreamExecutionEnvironment env =
>> >> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>         env.enableCheckpointing(500);
>> >>         env.setParallelism(1);
>> >>
>> >>         ParameterTool parameterTool = ParameterTool.fromArgs(args);
>> >>         Properties props = new Properties();
>> >>         props.put("schema.registry.url", "http://localhost:8081";);
>> >>         props.put("specific.avro.reader", true);
>> >>         VerifiableProperties vProps = new VerifiableProperties(props);
>> >>
>> >>         DeserializationSchema<String> decoder = new
>> >> MyAvroDeserializer(vProps);
>> >>         env.addSource(new
>> >> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
>> >>                 parameterTool.getProperties())).print();
>> >>
>> >>         env.execute();
>> >>     }
>> >>
>> >> public class MyAvroDeserializer implements
>> DeserializationSchema<String> {
>> >>     private KafkaAvroDecoder decoder;
>> >>
>> >>     public MyAvroDeserializer(VerifiableProperties vProps) {
>> >>         this.decoder = new KafkaAvroDecoder(vProps);
>> >>     }
>> >>
>> >>     @Override
>> >>     public String deserialize(byte[] message) throws IOException {
>> >>         return (String) this.decoder.fromBytes(message);
>> >>     }
>> >>
>> >>     @Override
>> >>     public boolean isEndOfStream(String nextElement) {
>> >>         return false;
>> >>     }
>> >>
>> >>     @Override
>> >>     public TypeInformation<String> getProducedType() {
>> >>         return null;
>> >>     }
>> >> }
>> >>
>> >>
>> >> Here is the error i am seeing...
>> >>
>> >>
>> >> Exception in thread "main"
>> >> org.apache.flink.api.common.InvalidProgramException: Object
>> >>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e
>> >> not serializable
>> >> at
>> >>
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
>> >> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
>> >> at
>> >>
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
>> >> at
>> >>
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
>> >> at
>> >>
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
>> >> at
>> >>
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
>> >> at test.flink.Streamingkafka.main(Streamingkafka.java:25)
>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >> at
>> >>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >> at
>> >>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >> at java.lang.reflect.Method.invoke(Method.java:497)
>> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
>> >> Caused by: java.io.NotSerializableException:
>> >> io.confluent.kafka.serializers.KafkaAvroDecoder
>> >> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> >> at
>> >>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >> at
>> >>
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >> at
>> >>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >> at
>> >>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>> >> at
>> >>
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>> >> at
>> >>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>> >> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>> >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> >> at
>> >>
>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
>> >> at
>> >>
>> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
>> >> ... 11 more
>> >>
>> >>
>> >>
>> >> Am i doing some thing wrong in my code?
>> >>
>> >>
>> >>
>> >> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <m...@apache.org>
>> >> wrote:
>> >>>
>> >>> Hi Madhukar,
>> >>>
>> >>> Thanks for your question. When you instantiate the FlinkKafkaConsumer,
>> >>> you supply a DeserializationSchema in the constructor. You simply
>> create a
>> >>> class which implements DeserializationSchema and contains the
>> >>> KafkaAvroDecoder with the schema registry.
>> >>>
>> >>> Like so:
>> >>>
>> >>> public class MyAvroDeserializer implements
>> DeserializationSchema<MyType>
>> >>> {
>> >>>
>> >>>     private KafkaAvroDecoder decoder;
>> >>>
>> >>>     public MyAvroDeserializer() {
>> >>>          SchemaRegistryClient schemaRegistry = new
>> >>> SchemaRegistryClient(...);
>> >>>          this.decoder = new KafkaAvroDecoder(schemaRegistry);
>> >>>     }
>> >>>
>> >>>     public MyType deserialize(byte[] message) throws Exception {
>> >>>          return (MyType) this.decoder.fromBytes(messages);
>> >>>     }
>> >>>
>> >>>     public boolean isEndOfStream(MyType nextElement) {
>> >>>          return false;
>> >>>     }
>> >>>
>> >>> }
>> >>>
>> >>> Then you supply this class when creating the consumer:
>> >>>
>> >>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
>> >>> Properties props = new Properties();
>> >>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
>> >>> FetcherType fetcherType =
>> >>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
>> >>>
>> >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"),
>> decoder,
>> >>> props, offsetStore, fetcherType);
>> >>>
>> >>>
>> >>> Let me know if that works for you.
>> >>>
>> >>> Best regards,
>> >>> Max
>> >>>
>> >>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota
>> >>> <madhukar.th...@gmail.com> wrote:
>> >>>>
>> >>>> Hi
>> >>>>
>> >>>> I am very new to Avro. Currently I am using confluent Kafka version
>> and
>> >>>> I am able to write an Avro message to Kafka by storing schema in
>> schema
>> >>>> registry. Now I need to consume those messages using Flink Kafka
>> Consumer
>> >>>> and having a hard time to deserialize the messages.
>> >>>>
>> >>>> I am looking for an example on how to deserialize Avro message where
>> >>>> schema is stored in schema registry.
>> >>>>
>> >>>> Any help is appreciated. Thanks in Advance.
>> >>>>
>> >>>> Thanks,
>> >>>> Madhu
>> >>>>
>> >>>
>> >>
>> >
>>
>
>

Reply via email to