Hi guys, trying to run this example:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() { @Override public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception { LongStream.range(0, 33).forEach(l -> { ctx.collect(Tuple2.of(0L, "This is " + l)); }); } @Override public void cancel() { } }); source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()). // source. keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))). apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple, GlobalWindow>() { @Override public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws Exception { System.out.println("!!!!!!!!! " + Joiner.on(",").join(input)); } }); env.execute("yoyoyo"); Getting Caused by: java.lang.ClassCastException: org.apache.flink.streaming.api.watermark.Watermark cannot be cast to org.apache.flink.streaming.runtime.streamrecord.StreamRecord at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90) - After googling I've found this: https://issues.apache.org/jira/browse/FLINK-3688 - went to github, downloaded branch 1.0.2 which contains specified change but having the same results. What am I missing here? Thanks! Konstantin