OK. I got past the problem. Basically, I had to change
public class MyKafkaMessageSerDeSchema implements
DeserializationSchema,
SerializationSchema {
@Override
public MyKafkaMessage deserialize(byte[] message) throws IOException {
MyKafkaMessage MyKafkaMessage = null;
try
The NPE came from this line:
StreamRecord copy =
castRecord.copy(serializer.copy(castRecord.getValue()));
Either serializer or castRecord was null.
I wonder if this has been fixed in 1.3.2 release.
On Mon, Aug 28, 2017 at 7:24 PM, Sridhar Chellappa
wrote:
> Kafka Version is 0.10.0
>
>
Kafka Version is 0.10.0
On Tue, Aug 29, 2017 at 6:43 AM, Sridhar Chellappa
wrote:
> 1.3.0
>
> On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu wrote:
>
>> Which Flink version are you using (so that line numbers can be matched
>> with source code) ?
>>
>> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellap
1.3.0
On Mon, Aug 28, 2017 at 10:09 PM, Ted Yu wrote:
> Which Flink version are you using (so that line numbers can be matched
> with source code) ?
>
> On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa
> wrote:
>
>> DataStream MyKafkaMessageDataStream = env.addSource(
>> getSt
Which Flink version are you using (so that line numbers can be matched with
source code) ?
On Mon, Aug 28, 2017 at 9:16 AM, Sridhar Chellappa
wrote:
> DataStream MyKafkaMessageDataStream = env.addSource(
> getStreamSource(env, parameterTool);
> );
>
>
>
> public R
DataStream MyKafkaMessageDataStream = env.addSource(
getStreamSource(env, parameterTool);
);
public RichParallelSourceFunction
getStreamSource(StreamExecutionEnvironment env, ParameterTool
parameterTool) {
// MyKAfkaMessage is a ProtoBuf message
env.g
Which version of Flink / Kafka are you using ?
Can you show the snippet of code where you create the DataStream ?
Cheers
On Mon, Aug 28, 2017 at 7:38 AM, Sridhar Chellappa
wrote:
> Folks,
>
> I have a KafkaConsumer that I am trying to read messages from. When I try
> to create a DataStream fro
Folks,
I have a KafkaConsumer that I am trying to read messages from. When I try
to create a DataStream from the KafkConsumer (env.addSource()) I get the
following exception :
Any idea on how can this happen?
java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.Ope