I'm getting a NullPointerException when calling
KakfaProducer010.processElement(StreamRecord<T>). Specifically, this comes
from its helper function invokeInternally(), and the function's
internalProducer not being configured properly, resulting in passing a null
value to one its helper functions.

We'd really appreciate taking a look at below to see if this is a Flink bug
or something we're doing wrong.

Our code

This is a simplified version of our program:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/code-part-1.png>
 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/code-part-2.png>
 

You can copy this code here to reproduce locally: 
https://pastebin.com/Li8iZuFj <https://pastebin.com/Li8iZuFj>  

Stack trace

Here is the stack trace:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/stack-trace.png>
 

What causes error in Flink code

The method processElement() calls invokeInternally(). Within
invokeInternally(), Flink tries to parse variable values, e.g. topic name
and partitions. 

The app fails when trying to resolve the partitions. Specifically, the
method to resolve the partitions has a parameter of KafkaProducer, which is
passed as null, resulting in the NullPointerException. See the highlighted
lines below of running the program in debugger view.
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/flink-code-null-value.png>
 

So, I think the issue is that the internalProducer is not being setup
correctly. Namely, it never sets the value for its producer field, so this
stays null and then gets passed around, resulting in the Null Pointer
Exception.

Bug? Or issue with our code?

My question to you all is if this is a bug that needs to be fixed, or if it
results from us improperly configuring our program? The above code shows our
configuration within the program itself (just setting bootstrap.servers),
and we created the Kafka topic on our local machine as follows:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor
1 --partitions 1 --topic process-elements-tests



Any help greatly appreciated! We're really hoping to get processElements()
to work, because our streaming architecture requires working on individual
elements rather than the entire data stream (sink behavior depends on the
individual values within each record of our DataStream<List&lt;T>>).





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to