Hi,
I am running a streaming job with generating watermark like this :

public static class SessionAssigner implements
AssignerWithPunctuatedWatermarks<GenericRecord> {
    @Override
    public long extractTimestamp(GenericRecord record, long
previousElementTimestamp) {
        long timestamp = (long) record.get("event_ts");
        LOGGER.info("timestamp----", timestamp);
        return timestamp;
    }

    @Override
    public Watermark checkAndGetNextWatermark(GenericRecord record,
long extractedTimestamp) {
        // simply emit a watermark with every event
        LOGGER.info("extractedTimestamp ", extractedTimestamp);
        return new Watermark(extractedTimestamp);
    }
}

Please help me understand what this exception means:

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
    at org.apache.flink.streaming.runtime.io.
StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
StreamOneInputProcessor.java:216)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve
.java:189)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve
.inputWatermark(StatusWatermarkValve.java:111)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processElement(StreamOneInputProcessor.java:169)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect
(TimestampedCollector.java:51)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137)
    at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalIterableProcessWindowFunction.process(
InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.
InternalIterableProcessWindowFunction.process(
InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.emitWindowContents(WindowOperator.java:549)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.onEventTime(WindowOperator.java:457)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
.advanceWatermark(InternalTimerServiceImpl.java:276)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager
.advanceWatermark(InternalTimeServiceManager.java:128)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator
.processWatermark(AbstractStreamOperator.java:784)
    at org.apache.flink.streaming.runtime.io.
StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(
StreamOneInputProcessor.java:213)
    ... 10 more

-- 
Thanks & Regards,
Anuj Jain



<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to