Konstantin Knauf created FLINK-3688:
---------------------------------------

             Summary: ClassCastException in StreamRecordSerializer when 
WindowOperator.trigger() is called and TimeCharacteristic = ProcessingTime
                 Key: FLINK-3688
                 URL: https://issues.apache.org/jira/browse/FLINK-3688
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.0.0
            Reporter: Konstantin Knauf
            Priority: Critical


Hi,

when using {{TimeCharacteristics.ProcessingTime}} a ClassCastException is 
thrown in {{StreamRecordSerializer}} when {{WindowOperator.processWatermark()}} 
is called from {{WindowOperator.trigger()}}, i.e. whenever a 
ProcessingTimeTimer is triggered. 

The problem seems to be that {{processWatermark()}} is also called in 
{{trigger()}}, when time characteristic is ProcessingTime, but in 
{{RecordWriterOutput}} {{enableWatermarkMultiplexing}} is {{false}} and the 
{{TypeSerializer}} is a {{StreamRecordSerializer}}, which ultimately leads to 
the ClassCastException. Do you agree?

If this is indeed a bug, there several possible solutions.

# Only calling {{processWatermark()}} in {{trigger()}}, when TimeCharacteristic 
is EventTime
# Not calling {{processWatermark()}} in {{trigger()}} at all, instead wait for 
the next watermark to trigger the EventTimeTimers with a timestamp behind the 
current watermark. This is, of course, a trade off. 
# Using {{MultiplexingStreamRecordSerializer}} all the time, but I have no idea 
what the side effect of this change would be. I assume there is a reason for 
existence of the StreamRecordSerializer ;)

StackTrace: 
{quote}
TimerException\{java.lang.RuntimeException: 
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord\}
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: 
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710)
        ... 7 more
Caused by: java.lang.ClassCastException: 
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
        ... 11 more
{quote}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to