Hi Konstantin,

this exception is thrown if you do not set the time characteristic to event
time and assign timestamps.
Please try to add

> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

after you obtained the StreamExecutionEnvironment.

Best, Fabian

2016-04-22 15:47 GMT+02:00 Konstantin Kulagin <kkula...@gmail.com>:

> Hi guys,
>
> trying to run this example:
>
>     StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     DataStreamSource<Tuple2<Long, String>> source = env.addSource(new 
> SourceFunction<Tuple2<Long, String>>() {
>       @Override
>       public void run(SourceContext<Tuple2<Long, String>> ctx) throws 
> Exception {
>         LongStream.range(0, 33).forEach(l -> {
>           ctx.collect(Tuple2.of(0L, "This is " + l));
>         });
>       }
>
>       @Override
>       public void cancel() {
>       }
>     });
>
>
>     source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
> //    source.
>     
> keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))).
>
>         apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, 
> GlobalWindow>() {
>           @Override
>           public void apply(Tuple tuple, GlobalWindow window, 
> Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception {
>             System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
>           }
>         });
>
>     env.execute("yoyoyo");
>
> Getting Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
>       at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
>       at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
>
>
> - After googling I've found this: 
> https://issues.apache.org/jira/browse/FLINK-3688
>
> - went to github, downloaded branch 1.0.2 which contains specified change but 
> having the same results.
>
> What am I missing here?
>
> Thanks!
>
> Konstantin
>
>
>

Reply via email to