Morning everyone, I'm getting the following exception in my Flink job (Flink version is 1.5.0):
java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) I'm not sure why this is happening but I suspect it could be a bug in Flink. My code is too proprietary to be shared directly, but here's the general gist. I'm getting data in as JSON, parsing it into POJOs, and then aggregating those with coGroup(), taking the maximum of two separate fields. I then take the results of this and aggregate it again, taking the average of these maximums grouped by a different field. In pseudocode: // first aggregation parsed = source .flatMap(new JsonConverterAndFilterer()) .assignTimestampsAndWatermarks(new MyTimestampExtractor(MAX_OUT_OF_ORDERNESS)); X = parsed.filter(q -> q.getX() != null); Y = parsed.filter(q -> q.getY() != null).map(AggregatedPojoClass::new); joined = X .coGroup(Y) .where(PojoClass::getId) .equalTo(AggregatedPojoClass::getId) .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS))) .apply(findMaximums()); // findMaximums() returns: @Override public void coGroup(Iterable<PojoClass> xMaybe, Iterable<AggregatedPojoClass> yMaybe, Collector<AggregatedPojoClass> collector) throws Exception { final Iterator<PojoClass> x = xMaybe.iterator(); final Iterator<AggregatedPojoClass> y = yMaybe.iterator(); if(x.hasNext() && y.hasNext()) { final PojoClass maxX = findMaxX(x); final AggregatedPojoClass maxY = findMaxY(y); if(maxX != null && maxY != null) { collector.collect(new AggregatedPojoClass(maxX).updateMaxY(maxY.getY())); } else { log.warn("[CoGroup case 1] Max X or max Y is null - SKIPPING: max x {}, max y {}", maxX, maxY); } } // ...other cases omitted for brevity... // second aggregation final DataStream<AggregatedPojoClass> result = joined .keyBy(AggregatedPojoClass::getSecondaryId) .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS))) .apply(new WindowAverageFunction()); final DataStream<String> resultJson = result .map(JsonUtil::toJson); // then a simple sink on this // WindowAverageFunction is: public class WindowAverageFunction implements WindowFunction<AggregatedPojoClass, AggregatedPojoClass, String, TimeWindow> { @Override public void apply(String secondaryId, TimeWindow timeWindow, Iterable<AggregatedPojoClass> input, Collector<AggregatedPojoClass> out) throws Exception { final Iterator<AggregatedPojoClass> i = input.iterator(); if(!i.hasNext()) { log.warn("Got empty window for secondary id '{}' - ignoring...", secondaryId); return; } // calculate some simple averages of the X and Y... Now when I run this code with some real data, an exception happens on line 88 here, inside the custom coGroup() function above (I've added line numbers to clarify): 87 if(maxY != null) { 88 collector.collect(maxY); 89 } else { 90 log.warn("[CoGroup case 3] Max Y null - SKIPPING"); 91 } The stack trace is as follows: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.mycorp.flink.AggregateStuffFlinkJob$1.coGroup(AggregateStuffFlinkJob.java:88) at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:683) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262) ... 7 more Caused by: java.lang.NullPointerException 2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39). 2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39) [FAILED] 2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) c3a09b0645721f8afd02a57d6a24ea39. Am I doing something wrong? Or is this a bug? Any ideas appreciated. Cheers,