At first glance this looks like a bug. Is the nothing in the stack trace after the NullPointerException?

How reliably can you reproduce this?

On 27.07.2018 19:00, Taneli Saastamoinen wrote:
Morning everyone,

I'm getting the following exception in my Flink job (Flink version is 1.5.0):

java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)

I'm not sure why this is happening but I suspect it could be a bug in Flink.

My code is too proprietary to be shared directly, but here's the general gist. I'm getting data in as JSON, parsing it into POJOs, and then aggregating those with coGroup(), taking the maximum of two separate fields. I then take the results of this and aggregate it again, taking the average of these maximums grouped by a different field. In pseudocode:

// first aggregation

parsed = source
    .flatMap(new JsonConverterAndFilterer())
.assignTimestampsAndWatermarks(new MyTimestampExtractor(MAX_OUT_OF_ORDERNESS));

X = parsed.filter(q -> q.getX() != null);

Y = parsed.filter(q -> q.getY() != null).map(AggregatedPojoClass::new);

joined = X
    .coGroup(Y)
    .where(PojoClass::getId)
    .equalTo(AggregatedPojoClass::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
    .apply(findMaximums());

// findMaximums() returns:

@Override
public void coGroup(Iterable<PojoClass> xMaybe, Iterable<AggregatedPojoClass> yMaybe, Collector<AggregatedPojoClass> collector) throws Exception {
    final Iterator<PojoClass> x = xMaybe.iterator();
    final Iterator<AggregatedPojoClass> y = yMaybe.iterator();
    if(x.hasNext() && y.hasNext()) {
        final PojoClass maxX = findMaxX(x);
        final AggregatedPojoClass maxY = findMaxY(y);
        if(maxX != null && maxY != null) {
collector.collect(new AggregatedPojoClass(maxX).updateMaxY(maxY.getY()));
        } else {
log.warn("[CoGroup case 1] Max X or max Y is null - SKIPPING: max x {}, max y {}", maxX, maxY);
        }
    } // ...other cases omitted for brevity...

// second aggregation

final DataStream<AggregatedPojoClass> result = joined
    .keyBy(AggregatedPojoClass::getSecondaryId)
.window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
    .apply(new WindowAverageFunction());

final DataStream<String> resultJson = result
    .map(JsonUtil::toJson);
// then a simple sink on this

// WindowAverageFunction is:

public class WindowAverageFunction implements WindowFunction<AggregatedPojoClass, AggregatedPojoClass, String, TimeWindow> {
    @Override
public void apply(String secondaryId, TimeWindow timeWindow, Iterable<AggregatedPojoClass> input, Collector<AggregatedPojoClass> out) throws Exception {
        final Iterator<AggregatedPojoClass> i = input.iterator();
        if(!i.hasNext()) {
log.warn("Got empty window for secondary id '{}' - ignoring...", secondaryId);
            return;
        }
        // calculate some simple averages of the X and Y...


Now when I run this code with some real data, an exception happens on line 88 here, inside the custom coGroup() function above (I've added line numbers to clarify):

87        if(maxY != null) {
88            collector.collect(maxY);
89        } else {
90            log.warn("[CoGroup case 3] Max Y null - SKIPPING");
91        }

The stack trace is as follows:

java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.mycorp.flink.AggregateStuffFlinkJob$1.coGroup(AggregateStuffFlinkJob.java:88) at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:683) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
Caused by: java.lang.NullPointerException
2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39). 2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39) [FAILED] 2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Window(TumblingEventTimeWindows(300000), EventTimeTrigger, CoGroupWindowFunction) c3a09b0645721f8afd02a57d6a24ea39.



Am I doing something wrong? Or is this a bug? Any ideas appreciated.

Cheers,


Reply via email to