Hi, I'm coming across these Exceptions while running a pretty simple flink job.
First one: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException The second one: java.io.IOException: Exception while applying ReduceFunction in reducing state at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:82) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ArrayIndexOutOfBoundsException Since it looks like something is wrong in Watermark processing, in my case Watermarks are generated in my KafkaSource: val stream = env.addSource( new FlinkKafkaConsumer010[Event](topic, new JSONDeserializationSchema(), consumerConfig) .setStartFromLatest() .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(10)) { def extractTimestamp(element: AirTrafficEvent): Long = element.instantValues.time.getMillis }) ) These exceptions aren't really that informative per se and, from what I see, the task triggering these exceptions is the following operator: val events = keyedStreamByID .window(TumblingEventTimeWindows.of(Time.seconds(20))) .maxBy("timestamp").name("latest_time").uid("latest_time") What could be the problem here in your opinion? It's not emitting watermarks correctly? I'm not even how I could reproduce this exceptions, since it looks like they happen pretty much randomly. Thank you all, Federico D'Ambrosio