[ https://issues.apache.org/jira/browse/FLINK-14327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955057#comment-16955057 ]
Zhu Zhu edited comment on FLINK-14327 at 10/19/19 3:07 AM: ----------------------------------------------------------- Hi [~ASK5], the NPE happens in flinkBroadcast1.scala which seems to be user code. Could you check flinkBroadcast1.scala line 41? If it's the code you attached, I guess it's these two lines in case that the property is null. {code:java} val loc = v.get("locationID").asInstanceOf[String] val temperature = v.get("temp").asDouble() {code} was (Author: zhuzh): Hi [~ASK5], the NPE to happen in flinkBroadcast1.scala which seems to be user code. Could you check flinkBroadcast1.scala line 41? If it's the code you attached, I guess it's these two lines in case that the property is null. {code:java} val loc = v.get("locationID").asInstanceOf[String] val temperature = v.get("temp").asDouble() {code} > Getting "Could not forward element to next operator" error > ---------------------------------------------------------- > > Key: FLINK-14327 > URL: https://issues.apache.org/jira/browse/FLINK-14327 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.9.0 > Reporter: ASK5 > Priority: Major > Fix For: 1.9.2 > > Attachments: so2.png > > > val TEMPERATURE_THRESHOLD: Double = 50.00 > val see: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > val properties = new Properties() > properties.setProperty("zookeeper.connect", "localhost:2181") > properties.setProperty("bootstrap.servers", "localhost:9092") > val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast", > new JSONKeyValueDeserializationSchema(false), > properties)).name("kafkaSource") > case class Event(locationID: String, temp: Double) > var data = src.map { v => { > val loc = v.get("locationID").asInstanceOf[String] > val temperature = v.get("temp").asDouble() > (loc, temperature) > }} > data = data > .keyBy( > v => v._1 > ) > data.print() > see.execute() > ---*********---- > And I'm getting the following error while consuming json file from Kafka:- > > {{Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed.... at flinkBroadcast1$.main(flinkBroadcast1.scala:59) at > flinkBroadcast1.main(flinkBroadcast1.scala)Caused by: java.lang.Exception: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator...Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator...Caused by: > java.lang.NullPointerException}} -- This message was sent by Atlassian Jira (v8.3.4#803005)