Morning everyone,

I'm getting the following exception in my Flink job (Flink version is
1.5.0):

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)

I'm not sure why this is happening but I suspect it could be a bug in Flink.

My code is too proprietary to be shared directly, but here's the general
gist. I'm getting data in as JSON, parsing it into POJOs, and then
aggregating those with coGroup(), taking the maximum of two separate
fields. I then take the results of this and aggregate it again, taking the
average of these maximums grouped by a different field. In pseudocode:

// first aggregation

parsed = source
    .flatMap(new JsonConverterAndFilterer())
    .assignTimestampsAndWatermarks(new
MyTimestampExtractor(MAX_OUT_OF_ORDERNESS));

X = parsed.filter(q -> q.getX() != null);

Y = parsed.filter(q -> q.getY() != null).map(AggregatedPojoClass::new);

joined = X
    .coGroup(Y)
    .where(PojoClass::getId)
    .equalTo(AggregatedPojoClass::getId)
    .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
    .apply(findMaximums());

// findMaximums() returns:

@Override
public void coGroup(Iterable<PojoClass> xMaybe,
Iterable<AggregatedPojoClass> yMaybe, Collector<AggregatedPojoClass>
collector) throws Exception {
    final Iterator<PojoClass> x = xMaybe.iterator();
    final Iterator<AggregatedPojoClass> y = yMaybe.iterator();
    if(x.hasNext() && y.hasNext()) {
        final PojoClass maxX = findMaxX(x);
        final AggregatedPojoClass maxY = findMaxY(y);
        if(maxX != null && maxY != null) {
            collector.collect(new
AggregatedPojoClass(maxX).updateMaxY(maxY.getY()));
        } else {
            log.warn("[CoGroup case 1] Max X or max Y is null - SKIPPING:
max x {}, max y {}", maxX, maxY);
        }
    } // ...other cases omitted for brevity...

// second aggregation

final DataStream<AggregatedPojoClass> result = joined
    .keyBy(AggregatedPojoClass::getSecondaryId)
    .window(TumblingEventTimeWindows.of(Time.seconds(WINDOW_SIZE_SECONDS)))
    .apply(new WindowAverageFunction());

final DataStream<String> resultJson = result
    .map(JsonUtil::toJson);
// then a simple sink on this

// WindowAverageFunction is:

public class WindowAverageFunction implements
WindowFunction<AggregatedPojoClass, AggregatedPojoClass, String,
TimeWindow> {
    @Override
    public void apply(String secondaryId, TimeWindow timeWindow,
Iterable<AggregatedPojoClass> input, Collector<AggregatedPojoClass> out)
throws Exception {
        final Iterator<AggregatedPojoClass> i = input.iterator();
        if(!i.hasNext()) {
            log.warn("Got empty window for secondary id '{}' -
ignoring...", secondaryId);
            return;
        }
        // calculate some simple averages of the X and Y...


Now when I run this code with some real data, an exception happens on line
88 here, inside the custom coGroup() function above (I've added line
numbers to clarify):

87        if(maxY != null) {
88            collector.collect(maxY);
89        } else {
90            log.warn("[CoGroup case 3] Max Y null - SKIPPING");
91        }

The stack trace is as follows:

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
com.mycorp.flink.AggregateStuffFlinkJob$1.coGroup(AggregateStuffFlinkJob.java:88)
at
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:683)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more
Caused by: java.lang.NullPointerException
2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Window(TumblingEventTimeWindows(300000),
EventTimeTrigger, CoGroupWindowFunction) (1/1)
(c3a09b0645721f8afd02a57d6a24ea39).
2018-07-27 13:34:36,271 INFO org.apache.flink.runtime.taskmanager.Task -
Ensuring all FileSystem streams are closed for task
Window(TumblingEventTimeWindows(300000), EventTimeTrigger,
CoGroupWindowFunction) (1/1) (c3a09b0645721f8afd02a57d6a24ea39) [FAILED]
2018-07-27 13:34:36,271 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task
and sending final execution state FAILED to JobManager for task
Window(TumblingEventTimeWindows(300000), EventTimeTrigger,
CoGroupWindowFunction) c3a09b0645721f8afd02a57d6a24ea39.



Am I doing something wrong? Or is this a bug? Any ideas appreciated.

Cheers,

Reply via email to