Kafka Version is 0.10.0 On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa <flinken...@gmail.com> wrote:
> 1.3.0 > > On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Which Flink version are you using (so that line numbers can be matched >> with source code) ? >> >> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa <flinken...@gmail.com> >> wrote: >> >>> DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource( >>> getStreamSource(env, parameterTool); >>> ); >>> >>> >>> >>> public RichParallelSourceFunction<MyKafkaMessage> >>> getStreamSource(StreamExecutionEnvironment env, ParameterTool >>> parameterTool) { >>> >>> // MyKAfkaMessage is a ProtoBuf message >>> >>> env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, >>> ProtobufSerializer.class); >>> >>> KafkaDataSource<MyKafkaMessage> flinkCepConsumer = >>> new KafkaDataSource<MyKafkaMessage>(parameterTool, >>> new MyKafkaMessageSerDeSchema()); >>> >>> return flinkCepConsumer; >>> } >>> >>> >>> public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> { >>> >>> public KafkaDataSource(ParameterTool parameterTool, >>> DeserializationSchema<T> deserializer) { >>> super( >>> Arrays.asList(parameterTool.ge >>> tRequired("topic").split(",")), >>> deserializer, >>> parameterTool.getProperties() >>> ); >>> >>> } >>> >>> } >>> >>> public class MyKafkaMessageSerDeSchema implements >>> DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> >>> { >>> >>> @Override >>> public MyKafkaMessage deserialize(byte[] message) throws IOException >>> { >>> MyKafkaMessage MyKafkaMessage = null; >>> try { >>> MyKafkaMessage = MyKafkaMessage.parseFrom(message); >>> } catch (InvalidProtocolBufferException e) { >>> e.printStackTrace(); >>> } finally { >>> return MyKafkaMessage; >>> } >>> } >>> >>> @Override >>> public boolean isEndOfStream(MyKafkaMessage nextElement) { >>> return false; >>> } >>> >>> @Override >>> public TypeInformation<MyKafkaMessage> getProducedType() { >>> return null; >>> } >>> >>> @Override >>> public byte[] serialize(MyKafkaMessage element) { >>> return new byte[0]; >>> } >>> } >>> >>> On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Which version of Flink / Kafka are you using ? >>>> >>>> Can you show the snippet of code where you create the DataStream ? >>>> >>>> Cheers >>>> >>>> On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa < >>>> flinken...@gmail.com> wrote: >>>> >>>>> Folks, >>>>> >>>>> I have a KafkaConsumer that I am trying to read messages from. When I >>>>> try to create a DataStream from the KafkConsumer (env.addSource()) I get >>>>> the following exception : >>>>> >>>>> Any idea on how can this happen? >>>>> >>>>> java.lang.NullPointerException >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> >>>>> >>>> >>> >> >