Hi Gordon, I'm currently using Flink 1.3.2 in local mode.
If it's any help I realized from the log that the complete task which is failing is: 2017-09-29 14:17:20,354 INFO org.apache.flink.runtime.taskmanager.Task - latest_time -> (map_active_stream, map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched from RUNNING to FAILED. val events = keyedStreamByID .window(TumblingEventTimeWindows.of(Time.seconds(20))) .maxBy("time").name("latest_time").uid("latest_time") val activeStream = events //Serialization to JsValue .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid("map_active_stream") //Global windowing, the cause of exception should be above .timeWindowAll(Time.seconds(10)) .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_window").uid("active_stream_window") val historyStream = airtrafficEvents //Serialization to JsValue .map(event => event.toMongoHistoryJsValue).name("map_history_stream").uid("map_history_stream") //Global windowing, the cause of exception should be above .timeWindowAll(Time.seconds(10)) .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_window").uid("history_stream_window") Regards, Federico 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>: > Hi, > > I’m looking into this. Could you let us know the Flink version in which > the exceptions occurred? > > Cheers, > Gordon > > > On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio ( > federico.dambro...@smartlab.ws) wrote: > > Hi, I'm coming across these Exceptions while running a pretty simple flink > job. > > First one: > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ArrayIndexOutOfBoundsException > > The second one: > java.io.IOException: Exception while applying ReduceFunction in reducing state > at > org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ArrayIndexOutOfBoundsException > > > Since it looks like something is wrong in Watermark processing, in my case > Watermarks are generated in my KafkaSource: > > val stream = env.addSource( > new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), > consumerConfig) > .setStartFromLatest() > .assignTimestampsAndWatermarks( > new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) { > def extractTimestamp(element: AirTrafficEvent): Long = > element.instantValues.time.getMillis > }) > ) > > These exceptions aren't really that informative per se and, from what I > see, the task triggering these exceptions is the following operator: > > val events = keyedStreamByID > .window(TumblingEventTimeWindows.of(Time.seconds(20))) > .maxBy("timestamp").name("latest_time").uid("latest_time") > > What could be the problem here in your opinion? It's not emitting > watermarks correctly? I'm not even how I could reproduce this exceptions, > since it looks like they happen pretty much randomly. > > Thank you all, > Federico D'Ambrosio > > -- Federico D'Ambrosio