Hi guys,

trying to run this example:

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<Tuple2<Long, String>> source = env.addSource(new
SourceFunction<Tuple2<Long, String>>() {
      @Override
      public void run(SourceContext<Tuple2<Long, String>> ctx) throws
Exception {
        LongStream.range(0, 33).forEach(l -> {
          ctx.collect(Tuple2.of(0L, "This is " + l));
        });
      }

      @Override
      public void cancel() {
      }
    });


    source.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()).
//    source.
    
keyBy(0).window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(5))).

        apply(new WindowFunction<Tuple2<Long, String>, Void, Tuple,
GlobalWindow>() {
          @Override
          public void apply(Tuple tuple, GlobalWindow window,
Iterable<Tuple2<Long, String>> input, Collector<Void> out) throws
Exception {
            System.out.println("!!!!!!!!! " + Joiner.on(",").join(input));
          }
        });

    env.execute("yoyoyo");

Getting Caused by: java.lang.ClassCastException:
org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)


- After googling I've found this:
https://issues.apache.org/jira/browse/FLINK-3688

- went to github, downloaded branch 1.0.2 which contains specified
change but having the same results.

What am I missing here?

Thanks!

Konstantin

Reply via email to