Hi Gordon, I remembered that I had already seen this kind of exception once during the testing of the current job and fortunately I had the complete stacktrace still saved on my pc:
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) at org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) I don't know why now the stacktrace is getting output only for the first parts (handleWatermark and HeapReducingState). So, it looks like something that has to do with the KryoSerializer. As a KryoSerializer I'm using JodaDateTimeSerializer, registered as follows: env.getConfig.addDefaultKryoSerializer(classOf[DateTime], classOf[JodaDateTimeSerializer]) I hope this could help. Regards, Federico 2017-09-29 15:54 GMT+02:00 Federico D'Ambrosio < federico.dambro...@smartlab.ws>: > Hi Gordon, > > I'm currently using Flink 1.3.2 in local mode. > > If it's any help I realized from the log that the complete task which is > failing is: > > 2017-09-29 14:17:20,354 INFO org.apache.flink.runtime. > taskmanager.Task - latest_time -> (map_active_stream, > map_history_stream) (1/1) (5a6c9f187326f678701f939665db6685) switched > from RUNNING to FAILED. > > val events = keyedStreamByID > .window(TumblingEventTimeWindows.of(Time.seconds(20))) > .maxBy("time").name("latest_time").uid("latest_time") > > > val activeStream = events > //Serialization to JsValue > .map(event => event.toMongoActiveJsValue).name("map_active_stream").uid( > "map_active_stream") > //Global windowing, the cause of exception should be above > .timeWindowAll(Time.seconds(10)) > .apply(new MongoWindow(MongoWritingType.UPDATE)).name("active_stream_ > window").uid("active_stream_window") > > val historyStream = airtrafficEvents > //Serialization to JsValue > .map(event => event.toMongoHistoryJsValue).name("map_history_stream"). > uid("map_history_stream") > //Global windowing, the cause of exception should be above > .timeWindowAll(Time.seconds(10)) > .apply(new MongoWindow(MongoWritingType.UPDATE)).name("history_stream_ > window").uid("history_stream_window") > > > > Regards, > Federico > > 2017-09-29 15:38 GMT+02:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>: > >> Hi, >> >> I’m looking into this. Could you let us know the Flink version in which >> the exceptions occurred? >> >> Cheers, >> Gordon >> >> >> On 29 September 2017 at 3:11:30 PM, Federico D'Ambrosio ( >> federico.dambro...@smartlab.ws) wrote: >> >> Hi, I'm coming across these Exceptions while running a pretty simple flink >> job. >> >> First one: >> java.lang.RuntimeException: Exception occurred while processing valve output >> watermark: >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) >> at >> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.ArrayIndexOutOfBoundsException >> >> The second one: >> java.io.IOException: Exception while applying ReduceFunction in reducing >> state >> at >> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82) >> at >> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.ArrayIndexOutOfBoundsException >> >> >> Since it looks like something is wrong in Watermark processing, in my case >> Watermarks are generated in my KafkaSource: >> >> val stream = env.addSource( >> new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), >> consumerConfig) >> .setStartFromLatest() >> .assignTimestampsAndWatermarks( >> new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) { >> def extractTimestamp(element: AirTrafficEvent): Long = >> element.instantValues.time.getMillis >> }) >> ) >> >> These exceptions aren't really that informative per se and, from what I >> see, the task triggering these exceptions is the following operator: >> >> val events = keyedStreamByID >> .window(TumblingEventTimeWindows.of(Time.seconds(20))) >> .maxBy("timestamp").name("latest_time").uid("latest_time") >> >> What could be the problem here in your opinion? It's not emitting >> watermarks correctly? I'm not even how I could reproduce this exceptions, >> since it looks like they happen pretty much randomly. >> >> Thank you all, >> Federico D'Ambrosio >> >> > > > -- > Federico D'Ambrosio > -- Federico D'Ambrosio