Hi Konstantin, this exception is thrown if you do not set the time characteristic to event time and assign timestamps. Please try to add
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) after you obtained the StreamExecutionEnvironment. Best, Fabian 2016-04-22 15:47 GMT+02:00 Konstantin Kulagin <kkula...@gmail.com>: > Hi guys, > > trying to run this example: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStreamSource<Tuple2<Long, String>> source = env.addSource(new > SourceFunction<Tuple2<Long, String>>() { > @Override > public void run(SourceContext<Tuple2<Long, String>> ctx) throws > Exception { > LongStream.range(0, 33).forEach(l -> { > ctx.collect(Tuple2.of(0L, "This is " + l)); > }); > } > > @Override > public void cancel() { > } > }); > > > source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()). > // source. > > keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))). > > apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, > GlobalWindow>() { > @Override > public void apply(Tuple tuple, GlobalWindow window, > Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception { > System.out.println("!!!!!!!!! " + Joiner.on(",").join(input)); > } > }); > > env.execute("yoyoyo"); > > Getting Caused by: java.lang.ClassCastException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90) > > > - After googling I've found this: > https://issues.apache.org/jira/browse/FLINK-3688 > > - went to github, downloaded branch 1.0.2 which contains specified change but > having the same results. > > What am I missing here? > > Thanks! > > Konstantin > > >