Hi, I am running a streaming job with generating watermark like this : public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord> { @Override public long extractTimestamp(GenericRecord record, long previousElementTimestamp) { long timestamp = (long) record.get("event_ts"); LOGGER.info("timestamp----", timestamp); return timestamp; }
@Override public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) { // simply emit a watermark with every event LOGGER.info("extractedTimestamp ", extractedTimestamp); return new Watermark(extractedTimestamp); } } Please help me understand what this exception means: java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io. StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark( StreamOneInputProcessor.java:216) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve .findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve .java:189) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve .inputWatermark(StatusWatermarkValve.java:111) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processElement(StreamOneInputProcessor.java:169) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:143) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask .java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.runtime.tasks. OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707) at org.apache.flink.streaming.runtime.tasks. OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 727) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 705) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:137) at com.bounce.test.SessionProcessor$2.process(SessionProcessor.java:116) at org.apache.flink.streaming.runtime.operators.windowing.functions. InternalIterableProcessWindowFunction.process( InternalIterableProcessWindowFunction.java:50) at org.apache.flink.streaming.runtime.operators.windowing.functions. InternalIterableProcessWindowFunction.process( InternalIterableProcessWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator .emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator .onEventTime(WindowOperator.java:457) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl .advanceWatermark(InternalTimerServiceImpl.java:276) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager .advanceWatermark(InternalTimeServiceManager.java:128) at org.apache.flink.streaming.api.operators.AbstractStreamOperator .processWatermark(AbstractStreamOperator.java:784) at org.apache.flink.streaming.runtime.io. StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark( StreamOneInputProcessor.java:213) ... 10 more -- Thanks & Regards, Anuj Jain <http://www.cse.iitm.ac.in/%7Eanujjain/>