[ 
https://issues.apache.org/jira/browse/FLINK-14327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASK5 updated FLINK-14327:
-------------------------
    Description: 
val TEMPERATURE_THRESHOLD: Double = 50.00

val see: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
 properties.setProperty("zookeeper.connect", "localhost:2181")
 properties.setProperty("bootstrap.servers", "localhost:9092")

val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
 new JSONKeyValueDeserializationSchema(false), properties)).name("kafkaSource")
 case class Event(locationID: String, temp: Double)

var data = src.map { v => {
 val loc = v.get("locationID").asInstanceOf[String]
 val temperature = v.get("temp").asDouble()
 (loc, temperature)
 }}

data = data
 .keyBy(
 v => v._1
 )

data.print()

see.execute()

---*********----

And I'm getting the following error while consuming json file from Kafka:-

 
 {{Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.... 
at flinkBroadcast1$.main(flinkBroadcast1.scala:59) at 
flinkBroadcast1.main(flinkBroadcast1.scala)Caused by: java.lang.Exception: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator...Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator...Caused by: 
java.lang.NullPointerException}}

  was:
val TEMPERATURE_THRESHOLD: Double = 50.00

val see: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment

val properties = new Properties()
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("bootstrap.servers", "localhost:9092")

val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
 new JSONKeyValueDeserializationSchema(false), properties)).name("kafkaSource")
case class Event(locationID: String, temp: Double)

var data = src.map { v => {
 val loc = v.get("locationID").asInstanceOf[String]
 val temperature = v.get("temp").asDouble()
 (loc, temperature)
}}


data = data
 .keyBy(
 v => v._1
 )

data.print()

see.execute()

and I'm getting the following error while consuming json file from Kafka:-


 
{{Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.... 
   at flinkBroadcast1$.main(flinkBroadcast1.scala:59)    at 
flinkBroadcast1.main(flinkBroadcast1.scala)Caused by: java.lang.Exception: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator...Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator...Caused by: 
java.lang.NullPointerException}}


> Getting "Could not forward element to next operator" error
> ----------------------------------------------------------
>
>                 Key: FLINK-14327
>                 URL: https://issues.apache.org/jira/browse/FLINK-14327
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.9.0
>            Reporter: ASK5
>            Priority: Major
>             Fix For: 1.9.0
>
>         Attachments: so2.png
>
>
> val TEMPERATURE_THRESHOLD: Double = 50.00
> val see: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> val properties = new Properties()
>  properties.setProperty("zookeeper.connect", "localhost:2181")
>  properties.setProperty("bootstrap.servers", "localhost:9092")
> val src = see.addSource(new FlinkKafkaConsumer010[ObjectNode]("broadcast",
>  new JSONKeyValueDeserializationSchema(false), 
> properties)).name("kafkaSource")
>  case class Event(locationID: String, temp: Double)
> var data = src.map { v => {
>  val loc = v.get("locationID").asInstanceOf[String]
>  val temperature = v.get("temp").asDouble()
>  (loc, temperature)
>  }}
> data = data
>  .keyBy(
>  v => v._1
>  )
> data.print()
> see.execute()
> ---*********----
> And I'm getting the following error while consuming json file from Kafka:-
>  
>  {{Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution 
> failed.... at flinkBroadcast1$.main(flinkBroadcast1.scala:59) at 
> flinkBroadcast1.main(flinkBroadcast1.scala)Caused by: java.lang.Exception: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator...Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator...Caused by: 
> java.lang.NullPointerException}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to