DataStream<MyKafkaMessage> MyKafkaMessageDataStream = env.addSource( getStreamSource(env, parameterTool); );
public RichParallelSourceFunction<MyKafkaMessage> getStreamSource(StreamExecutionEnvironment env, ParameterTool parameterTool) { // MyKAfkaMessage is a ProtoBuf message env.getConfig().registerTypeWithKryoSerializer(MyKafkaMessage.class, ProtobufSerializer.class); KafkaDataSource<MyKafkaMessage> flinkCepConsumer = new KafkaDataSource<MyKafkaMessage>(parameterTool, new MyKafkaMessageSerDeSchema()); return flinkCepConsumer; } public class KafkaDataSource<T> extends FlinkKafkaConsumer010<T> { public KafkaDataSource(ParameterTool parameterTool, DeserializationSchema<T> deserializer) { super( Arrays.asList(parameterTool.getRequired("topic").split(",")), deserializer, parameterTool.getProperties() ); } } public class MyKafkaMessageSerDeSchema implements DeserializationSchema<MyKafkaMessage>, SerializationSchema<MyKafkaMessage> { @Override public MyKafkaMessage deserialize(byte[] message) throws IOException { MyKafkaMessage MyKafkaMessage = null; try { MyKafkaMessage = MyKafkaMessage.parseFrom(message); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } finally { return MyKafkaMessage; } } @Override public boolean isEndOfStream(MyKafkaMessage nextElement) { return false; } @Override public TypeInformation<MyKafkaMessage> getProducedType() { return null; } @Override public byte[] serialize(MyKafkaMessage element) { return new byte[0]; } } On Mon, Aug 28, 2017 at 8:26 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Which version of Flink / Kafka are you using ? > > Can you show the snippet of code where you create the DataStream ? > > Cheers > > On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa <flinken...@gmail.com> > wrote: > >> Folks, >> >> I have a KafkaConsumer that I am trying to read messages from. When I try >> to create a DataStream from the KafkConsumer (env.addSource()) I get the >> following exception : >> >> Any idea on how can this happen? >> >> java.lang.NullPointerException >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:264) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) >> at >> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:149) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:449) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> >> >