Hi Max/Ewen,

Thank you for the inputs. I was able to solve the serialization issues. Now
i am seeing the NullPoint Exceptions.

public class MyAvroDeserializer implements DeserializationSchema<String> {

    private transient KafkaAvroDecoder decoder;

    public MyAvroDeserializer(VerifiableProperties vProps) {
        if (this.decoder == null) {
            this.decoder = new KafkaAvroDecoder(vProps);
        }
    }

    @Override
    public String deserialize(byte[] message) throws IOException {
        return (String) decoder.fromBytes(message);
    }

    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return TypeExtractor.getForClass(String.class);
    }

}

11/19/2015 07:47:39     Source: Custom Source -> Sink: Unnamed(1/4)
switched to RUNNING
11/19/2015 07:47:39     Source: Custom Source -> Sink: Unnamed(1/4)
switched to FAILED
java.lang.Exception
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
        at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)

11/19/2015 07:47:39     Job execution switched to status FAILING.
11/19/2015 07:47:39     Source: Custom Source -> Sink: Unnamed(2/4)
switched to CANCELING
11/19/2015 07:47:39     Source: Custom Source -> Sink: Unnamed(3/4)
switched to CANCELING
11/19/2015 07:47:39     Source: Custom Source -> Sink: Unnamed(4/4)
switched to CANCELING
11/19/2015 07:47:39     Source: Custom Source -> Sink: Unnamed(2/4)
switched to CANCELED
11/19/2015 07:47:39     Source: Custom Source -> Sink: Unnamed(4/4)
switched to CANCELED
11/19/2015 07:47:39     Source: Custom Source -> Sink: Unnamed(3/4)
switched to CANCELED
11/19/2015 07:47:39     Job execution switched to status FAILED.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:382)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
        at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:23)
        at test.flink.MyAvroDeserializer.deserialize(MyAvroDeserializer.java:11)
        at 
org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:445)




On Thu, Nov 19, 2015 at 6:30 AM, Maximilian Michels <m...@apache.org> wrote:

> Stephan is right, this should do it in deserialize():
>
>     if (decoder == null) {
>         decoder = new KafkaAvroDecoder(vProps);
>     }
>
> Further, you might have to specify the correct return type for
> getProducedType(). You may use
>
>     public TypeInformation<String> getProducedType() {
>         return TypeExtractor.getForClass(String.class);
>     }
>
> Cheers,
> Max
>
> On Thu, Nov 19, 2015 at 12:18 PM, Stephan Ewen <se...@apache.org> wrote:
> > The KafkaAvroDecoder is not serializable, and Flink uses serialization to
> > distribute the code to the TaskManagers in the cluster.
> >
> > I think you need to "lazily" initialize the decoder, in the first
> invocation
> > of "deserialize()". That should do it.
> >
> > Stephan
> >
> >
> > On Thu, Nov 19, 2015 at 12:10 PM, Madhukar Thota <
> madhukar.th...@gmail.com>
> > wrote:
> >>
> >> Hi Max
> >>
> >> Thanks for the example.
> >>
> >> Based on your example here is what i did:
> >>
> >> public class Streamingkafka {
> >>     public static void main(String[] args) throws Exception {
> >>         StreamExecutionEnvironment env =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >>         env.enableCheckpointing(500);
> >>         env.setParallelism(1);
> >>
> >>         ParameterTool parameterTool = ParameterTool.fromArgs(args);
> >>         Properties props = new Properties();
> >>         props.put("schema.registry.url", "http://localhost:8081";);
> >>         props.put("specific.avro.reader", true);
> >>         VerifiableProperties vProps = new VerifiableProperties(props);
> >>
> >>         DeserializationSchema<String> decoder = new
> >> MyAvroDeserializer(vProps);
> >>         env.addSource(new
> >> FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"), decoder,
> >>                 parameterTool.getProperties())).print();
> >>
> >>         env.execute();
> >>     }
> >>
> >> public class MyAvroDeserializer implements
> DeserializationSchema<String> {
> >>     private KafkaAvroDecoder decoder;
> >>
> >>     public MyAvroDeserializer(VerifiableProperties vProps) {
> >>         this.decoder = new KafkaAvroDecoder(vProps);
> >>     }
> >>
> >>     @Override
> >>     public String deserialize(byte[] message) throws IOException {
> >>         return (String) this.decoder.fromBytes(message);
> >>     }
> >>
> >>     @Override
> >>     public boolean isEndOfStream(String nextElement) {
> >>         return false;
> >>     }
> >>
> >>     @Override
> >>     public TypeInformation<String> getProducedType() {
> >>         return null;
> >>     }
> >> }
> >>
> >>
> >> Here is the error i am seeing...
> >>
> >>
> >> Exception in thread "main"
> >> org.apache.flink.api.common.InvalidProgramException: Object
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082@3bf9ce3e
> >> not serializable
> >> at
> >>
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> >> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:59)
> >> at
> >>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1228)
> >> at
> >>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1163)
> >> at
> >>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1107)
> >> at
> >>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1089)
> >> at test.flink.Streamingkafka.main(Streamingkafka.java:25)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:497)
> >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> >> Caused by: java.io.NotSerializableException:
> >> io.confluent.kafka.serializers.KafkaAvroDecoder
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> >> at
> >>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >> at
> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >> at
> >>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >> at
> >>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >> at
> >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >> at
> >>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >> at
> >>
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
> >> at
> >>
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:95)
> >> ... 11 more
> >>
> >>
> >>
> >> Am i doing some thing wrong in my code?
> >>
> >>
> >>
> >> On Thu, Nov 19, 2015 at 4:46 AM, Maximilian Michels <m...@apache.org>
> >> wrote:
> >>>
> >>> Hi Madhukar,
> >>>
> >>> Thanks for your question. When you instantiate the FlinkKafkaConsumer,
> >>> you supply a DeserializationSchema in the constructor. You simply
> create a
> >>> class which implements DeserializationSchema and contains the
> >>> KafkaAvroDecoder with the schema registry.
> >>>
> >>> Like so:
> >>>
> >>> public class MyAvroDeserializer implements
> DeserializationSchema<MyType>
> >>> {
> >>>
> >>>     private KafkaAvroDecoder decoder;
> >>>
> >>>     public MyAvroDeserializer() {
> >>>          SchemaRegistryClient schemaRegistry = new
> >>> SchemaRegistryClient(...);
> >>>          this.decoder = new KafkaAvroDecoder(schemaRegistry);
> >>>     }
> >>>
> >>>     public MyType deserialize(byte[] message) throws Exception {
> >>>          return (MyType) this.decoder.fromBytes(messages);
> >>>     }
> >>>
> >>>     public boolean isEndOfStream(MyType nextElement) {
> >>>          return false;
> >>>     }
> >>>
> >>> }
> >>>
> >>> Then you supply this class when creating the consumer:
> >>>
> >>> DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
> >>> Properties props = new Properties();
> >>> OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
> >>> FetcherType fetcherType =
> >>> FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;
> >>>
> >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"),
> decoder,
> >>> props, offsetStore, fetcherType);
> >>>
> >>>
> >>> Let me know if that works for you.
> >>>
> >>> Best regards,
> >>> Max
> >>>
> >>> On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota
> >>> <madhukar.th...@gmail.com> wrote:
> >>>>
> >>>> Hi
> >>>>
> >>>> I am very new to Avro. Currently I am using confluent Kafka version
> and
> >>>> I am able to write an Avro message to Kafka by storing schema in
> schema
> >>>> registry. Now I need to consume those messages using Flink Kafka
> Consumer
> >>>> and having a hard time to deserialize the messages.
> >>>>
> >>>> I am looking for an example on how to deserialize Avro message where
> >>>> schema is stored in schema registry.
> >>>>
> >>>> Any help is appreciated. Thanks in Advance.
> >>>>
> >>>> Thanks,
> >>>> Madhu
> >>>>
> >>>
> >>
> >
>

Reply via email to