Hi,

It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is 
there any specific reason why you want to emit elements to Kafka in a map 
function?

The correct way to use it is to add it as a sink function to your pipeline, i.e.

DataStream<String> someStream = …
someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props));
// or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, 
schema, props);

The processElement is invoked internally by the system, and isn’t intended to 
be invoked by user code.

See [1] for more details.

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer

On 15 July 2017 at 3:35:32 AM, earellano (eric.arell...@ge.com) wrote:

I'm getting a NullPointerException when calling  
KakfaProducer010.processElement(StreamRecord<T>). Specifically, this comes  
from its helper function invokeInternally(), and the function's  
internalProducer not being configured properly, resulting in passing a null  
value to one its helper functions.  

We'd really appreciate taking a look at below to see if this is a Flink bug  
or something we're doing wrong.  

Our code  

This is a simplified version of our program:  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/code-part-1.png>
  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/code-part-2.png>
  

You can copy this code here to reproduce locally:  
https://pastebin.com/Li8iZuFj <https://pastebin.com/Li8iZuFj>  

Stack trace  

Here is the stack trace:  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/stack-trace.png>
  

What causes error in Flink code  

The method processElement() calls invokeInternally(). Within  
invokeInternally(), Flink tries to parse variable values, e.g. topic name  
and partitions.  

The app fails when trying to resolve the partitions. Specifically, the  
method to resolve the partitions has a parameter of KafkaProducer, which is  
passed as null, resulting in the NullPointerException. See the highlighted  
lines below of running the program in debugger view.  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/flink-code-null-value.png>
  

So, I think the issue is that the internalProducer is not being setup  
correctly. Namely, it never sets the value for its producer field, so this  
stays null and then gets passed around, resulting in the Null Pointer  
Exception.  

Bug? Or issue with our code?  

My question to you all is if this is a bug that needs to be fixed, or if it  
results from us improperly configuring our program? The above code shows our  
configuration within the program itself (just setting bootstrap.servers),  
and we created the Kafka topic on our local machine as follows:  

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor  
1 --partitions 1 --topic process-elements-tests  



Any help greatly appreciated! We're really hoping to get processElements()  
to work, because our streaming architecture requires working on individual  
elements rather than the entire data stream (sink behavior depends on the  
individual values within each record of our DataStream<List&lt;T>>).  





--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  

Reply via email to