Thanks.
That works great.

Niels

On Mon, Nov 30, 2015 at 3:32 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> the problem here is that the system needs to be aware that Watermarks will
> be flowing through the system. You can either do this via:
>
> env.setStreamTimeCharacteristic(EventTime);
>
> or:
>
> env.getConfig().enableTimestamps();
>
> I know, not very intuitive.
>
> Cheers,
> Aljoscha
>
> > On 30 Nov 2015, at 14:47, Niels Basjes <ni...@basjes.nl> wrote:
> >
> > Hi,
> >
> > I'm experimenting with a custom Windowing setup over clickstream data.
> > I want the timestamps of this clickstream data to be the timestamps
> 'when the event occurred' and in the Windows I need to trigger on these
> times.
> >
> > For testing I created a source roughly like this:
> >     public class ManualTimeEventSource extends
> RichEventTimeSourceFunction<Long> {
> >                     ctx.collectWithTimestamp(event, event.timestamp);
> >
> > But none of the triggers were called so I started digging through the
> code.
> > Then I figured I apparently needed to add the watermarks myself, so I
> added a line:
> >                     ctx.emitWatermark(new Watermark(event.timestamp));
> >
> > But now I get:
> >
> > Caused by: java.lang.ClassCastException:
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> >       at
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:41)
> >       at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> >       at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
> >       at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
> >       at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:93)
> >       at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:88)
> >       ... 9 more
> >
> > This seems like a bug to me (StreamElement vs StreamRecord). Is it a bug
> in Flink or in my code?
> >
> > What is the right way to trigger the events in my Windowing setup?
> >
> >
> >
> > P.S. I'm binding my Java application against Flink version 0.10.1
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to