At first glance this looks like a bug. Is the nothing in the stack trace
after the NullPointerException?
How reliably can you reproduce this?
On 27.07.2018 19:00, Taneli Saastamoinen wrote:
Morning everyone,
I'm getting the following exception in my Flink job (Flink version is
1.5.0):
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
I'm not sure why this is happening but I suspect it could be a bug in
Flink.
My code is too proprietary to be shared directly, but here's the
general gist. I'm getting data in as JSON, parsing it into POJOs, and
then aggregating those with coGroup(), taking the maximum of two
separate fields. I then take the results of this and aggregate it
again, taking the average of these maximums grouped by a different
field. In pseudocode:
// first aggregation
parsed = source
.flatMap(new JsonConverterAndFilterer())
.assignTimestampsAndWatermarks(new
MyTimestampExtractor(MAX_OUT_OF_ORDERNESS));
X = parsed.filter(q -> q.getX() != null);
Y = parsed.filter(q -> q.getY() != null).map(AggregatedPojoClass::new);
joined = X
.coGroup(Y)
.where(PojoClass::getId)
.equalTo(AggregatedPojoClass::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
.apply(findMaximums());
// findMaximums() returns:
@Override
public void coGroup(Iterable<PojoClass> xMaybe,
Iterable<AggregatedPojoClass> yMaybe, Collector<AggregatedPojoClass>
collector) throws Exception {
final Iterator<PojoClass> x = xMaybe.iterator();
final Iterator<AggregatedPojoClass> y = yMaybe.iterator();
if(x.hasNext() && y.hasNext()) {
final PojoClass maxX = findMaxX(x);
final AggregatedPojoClass maxY = findMaxY(y);
if(maxX != null && maxY != null) {
collector.collect(new
AggregatedPojoClass(maxX).updateMaxY(maxY.getY()));
} else {
log.warn("[CoGroup case 1] Max X or max Y is null -
SKIPPING: max x {}, max y {}", maxX, maxY);
}
} // ...other cases omitted for brevity...
// second aggregation
final DataStream<AggregatedPojoClass> result = joined
.keyBy(AggregatedPojoClass::getSecondaryId)
.window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
.apply(new WindowAverageFunction());
final DataStream<String> resultJson = result
.map(JsonUtil::toJson);
// then a simple sink on this
// WindowAverageFunction is:
public class WindowAverageFunction implements
WindowFunction<AggregatedPojoClass, AggregatedPojoClass, String,
TimeWindow> {
@Override
public void apply(String secondaryId, TimeWindow timeWindow,
Iterable<AggregatedPojoClass> input, Collector<AggregatedPojoClass>
out) throws Exception {
final Iterator<AggregatedPojoClass> i = input.iterator();
if(!i.hasNext()) {
log.warn("Got empty window for secondary id '{}' -
ignoring...", secondaryId);
return;
}
// calculate some simple averages of the X and Y...
Now when I run this code with some real data, an exception happens on
line 88 here, inside the custom coGroup() function above (I've added
line numbers to clarify):
87 if(maxY != null) {
88 collector.collect(maxY);
89 } else {
90 log.warn("[CoGroup case 3] Max Y null - SKIPPING");
91 }
The stack trace is as follows:
java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.mycorp.flink.AggregateStuffFlinkJob$1.coGroup(AggregateStuffFlinkJob.java:88)
at
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:683)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
Caused by: java.lang.NullPointerException
2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task
- Freeing task resources for Window(TumblingEventTimeWindows(300000),
EventTimeTrigger, CoGroupWindowFunction) (1/1)
(c3a09b0645721f8afd02a57d6a24ea39).
2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task
- Ensuring all FileSystem streams are closed for task
Window(TumblingEventTimeWindows(300000), EventTimeTrigger,
CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39) [FAILED]
2018-07-27 13:34:36,271 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering
task and sending final execution state FAILED to JobManager for task
Window(TumblingEventTimeWindows(300000), EventTimeTrigger,
CoGroupWindowFunction) c3a09b0645721f8afd02a57d6a24ea39.
Am I doing something wrong? Or is this a bug? Any ideas appreciated.
Cheers,