Hi, I had a similar issue recently. Instead of input.assignTimestampsAndWatermarks
you have to do: input = input.assignTimestampsAndWatermarks On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <namluc.t...@euranova.eu> wrote: > Hello everyone, > > I am currently playing with streams which timestamp is defined by > EventTime. I currently have the following code: > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > DataStream<String> input = > env.readTextFile("file:///var/log/syslog"); > input.assignTimestampsAndWatermarks(new > AssignTimestampFromLogEvent()); > > input.timeWindowAll(Time.minutes(5)).apply(new > AllWindowFunction<Iterable<String>, String, TimeWindow>() { > @Override > public void apply(TimeWindow window, Iterable<String> values, > Collector<String> out) throws Exception { > for(String t:values) { > out.collect(t); > } > } > }).print(); > > (...) > > public static final class AssignTimestampFromLogEvent extends > AscendingTimestampExtractor<String> { > @Override > public long extractAscendingTimestamp(String element, long > previousElementTimestamp){ > String date = element.substring(0,15); > SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); > Date ddate = null; > try { > ddate = sdf.parse(date); > } catch (ParseException e) { > e.printStackTrace(); > } > return ddate.getTime(); > } > } > > > What I expect it to do is to read the syslog, assign timestamp and do > 5 minutes windows *based on the syslog event time*, as I've configured > the stream to do it. It however does not do that, and does the windows > based on processing time. > > What am I missing here? > > Best regards, > > -- > > *Nam-Luc TRAN* > > R&D Manager > > EURA NOVA > > (M) +32 498 37 36 23 > > *euranova.eu <http://euranova.eu>* >