Hi, the problem here is that the system needs to be aware that Watermarks will be flowing through the system. You can either do this via:
env.setStreamTimeCharacteristic(EventTime); or: env.getConfig().enableTimestamps(); I know, not very intuitive. Cheers, Aljoscha > On 30 Nov 2015, at 14:47, Niels Basjes <ni...@basjes.nl> wrote: > > Hi, > > I'm experimenting with a custom Windowing setup over clickstream data. > I want the timestamps of this clickstream data to be the timestamps 'when the > event occurred' and in the Windows I need to trigger on these times. > > For testing I created a source roughly like this: > public class ManualTimeEventSource extends > RichEventTimeSourceFunction<Long> { > ctx.collectWithTimestamp(event, event.timestamp); > > But none of the triggers were called so I started digging through the code. > Then I figured I apparently needed to add the watermarks myself, so I added a > line: > ctx.emitWatermark(new Watermark(event.timestamp)); > > But now I get: > > Caused by: java.lang.ClassCastException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88) > ... 9 more > > This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug in > Flink or in my code? > > What is the right way to trigger the events in my Windowing setup? > > > > P.S. I'm binding my Java application against Flink version 0.10.1 > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes