Konstantin Knauf created FLINK-3688: ---------------------------------------
Summary: ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is called and TimeCharacteristic = ProcessingTime Key: FLINK-3688 URL: https://issues.apache.org/jira/browse/FLINK-3688 Project: Flink Issue Type: Bug Affects Versions: 1.0.0 Reporter: Konstantin Knauf Priority: Critical Hi, when using {{TimeCharacteristics.ProcessingTime}} a ClassCastException is thrown in {{StreamRecordSerializer}} when {{WindowOperator.processWatermark()}} is called from {{WindowOperator.trigger()}}, i.e. whenever a ProcessingTimeTimer is triggered. The problem seems to be that {{processWatermark()}} is also called in {{trigger()}}, when time characteristic is ProcessingTime, but in {{RecordWriterOutput}} {{enableWatermarkMultiplexing}} is {{false}} and the {{TypeSerializer}} is a {{StreamRecordSerializer}}, which ultimately leads to the ClassCastException. Do you agree? If this is indeed a bug, there several possible solutions. # Only calling {{processWatermark()}} in {{trigger()}}, when TimeCharacteristic is EventTime # Not calling {{processWatermark()}} in {{trigger()}} at all, instead wait for the next watermark to trigger the EventTimeTimers with a timestamp behind the current watermark. This is, of course, a trade off. # Using {{MultiplexingStreamRecordSerializer}} all the time, but I have no idea what the side effect of this change would be. I assume there is a reason for existence of the StreamRecordSerializer ;) StackTrace: {quote} TimerException\{java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord\} at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.RuntimeException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323) at org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710) ... 7 more Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90) ... 11 more {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)