Great, that did it, thanks Robert ;) While I'm at it: Sometimes results are correctly returned, sometimes, the output of the job (print or writeAsText) is plain empty, like the job finished too quickly before the results are written. One way of "forcing" results to happen is to insert a "delay" in the source stream, as with a FlatMap:
@Override public void flatMap(String value, Collector<String> out) throws Exception { Thread.sleep(1); out.collect(value.toLowerCase()); } Am I missing anything here? Best regards, 2016-02-25 20:05 GMT+01:00 Robert Metzger <rmetz...@apache.org>: > Hi, > > I had a similar issue recently. > Instead of > input.assignTimestampsAndWatermarks > > you have to do: > > input = input.assignTimestampsAndWatermarks > > On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <namluc.t...@euranova.eu> > wrote: > > > Hello everyone, > > > > I am currently playing with streams which timestamp is defined by > > EventTime. I currently have the following code: > > > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > DataStream<String> input = > > env.readTextFile("file:///var/log/syslog"); > > input.assignTimestampsAndWatermarks(new > > AssignTimestampFromLogEvent()); > > > > input.timeWindowAll(Time.minutes(5)).apply(new > > AllWindowFunction<Iterable<String>, String, TimeWindow>() { > > @Override > > public void apply(TimeWindow window, Iterable<String> values, > > Collector<String> out) throws Exception { > > for(String t:values) { > > out.collect(t); > > } > > } > > }).print(); > > > > (...) > > > > public static final class AssignTimestampFromLogEvent extends > > AscendingTimestampExtractor<String> { > > @Override > > public long extractAscendingTimestamp(String element, long > > previousElementTimestamp){ > > String date = element.substring(0,15); > > SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); > > Date ddate = null; > > try { > > ddate = sdf.parse(date); > > } catch (ParseException e) { > > e.printStackTrace(); > > } > > return ddate.getTime(); > > } > > } > > > > > > What I expect it to do is to read the syslog, assign timestamp and do > > 5 minutes windows *based on the syslog event time*, as I've configured > > the stream to do it. It however does not do that, and does the windows > > based on processing time. > > > > What am I missing here? > > > > Best regards, > > > > -- > > > > *Nam-Luc TRAN* > > > > R&D Manager > > > > EURA NOVA > > > > (M) +32 498 37 36 23 > > > > *euranova.eu <http://euranova.eu>* > > > -- *Nam-Luc TRAN* R&D Manager EURA NOVA (M) +32 498 37 36 23 *euranova.eu <http://euranova.eu>*