Hi, It seems like you’ve misunderstood how to use the FlinkKafkaProducer, or is there any specific reason why you want to emit elements to Kafka in a map function?
The correct way to use it is to add it as a sink function to your pipeline, i.e. DataStream<String> someStream = … someStream.addSink(new FlinkKafkaProducer010<>(“topic”, schema, props)); // or, FlinkKafkaProducer010.writeToKafkaWithTimestamps(someStream, “topic”, schema, props); The processElement is invoked internally by the system, and isn’t intended to be invoked by user code. See [1] for more details. Cheers, Gordon [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#kafka-producer On 15 July 2017 at 3:35:32 AM, earellano (eric.arell...@ge.com) wrote: I'm getting a NullPointerException when calling KakfaProducer010.processElement(StreamRecord<T>). Specifically, this comes from its helper function invokeInternally(), and the function's internalProducer not being configured properly, resulting in passing a null value to one its helper functions. We'd really appreciate taking a look at below to see if this is a Flink bug or something we're doing wrong. Our code This is a simplified version of our program: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/code-part-1.png> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/code-part-2.png> You can copy this code here to reproduce locally: https://pastebin.com/Li8iZuFj <https://pastebin.com/Li8iZuFj> Stack trace Here is the stack trace: <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/stack-trace.png> What causes error in Flink code The method processElement() calls invokeInternally(). Within invokeInternally(), Flink tries to parse variable values, e.g. topic name and partitions. The app fails when trying to resolve the partitions. Specifically, the method to resolve the partitions has a parameter of KafkaProducer, which is passed as null, resulting in the NullPointerException. See the highlighted lines below of running the program in debugger view. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14288/flink-code-null-value.png> So, I think the issue is that the internalProducer is not being setup correctly. Namely, it never sets the value for its producer field, so this stays null and then gets passed around, resulting in the Null Pointer Exception. Bug? Or issue with our code? My question to you all is if this is a bug that needs to be fixed, or if it results from us improperly configuring our program? The above code shows our configuration within the program itself (just setting bootstrap.servers), and we created the Kafka topic on our local machine as follows: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic process-elements-tests Any help greatly appreciated! We're really hoping to get processElements() to work, because our streaming architecture requires working on individual elements rather than the entire data stream (sink behavior depends on the individual values within each record of our DataStream<List<T>>). -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Producer-Null-Pointer-Exception-when-processing-by-element-tp14288.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.