Hi, I think the problem is that the source finished before the extractor has the chance to emit even a single watermark. This means that the topology will shut down and the window operator does not emit in-flight windows upon shutdown.
Cheers, Aljoscha > On 26 Feb 2016, at 11:40, Nam-Luc Tran <namluc.t...@euranova.eu> wrote: > > Great, that did it, thanks Robert ;) > > While I'm at it: > Sometimes results are correctly returned, sometimes, the output of the job > (print or writeAsText) is plain empty, like the job finished too quickly > before the results are written. One way of "forcing" results to happen is > to insert a "delay" in the source stream, as with a FlatMap: > > @Override > public void flatMap(String value, Collector<String> out) > throws Exception { > Thread.sleep(1); > out.collect(value.toLowerCase()); > } > > Am I missing anything here? > > Best regards, > > > 2016-02-25 20:05 GMT+01:00 Robert Metzger <rmetz...@apache.org>: > >> Hi, >> >> I had a similar issue recently. >> Instead of >> input.assignTimestampsAndWatermarks >> >> you have to do: >> >> input = input.assignTimestampsAndWatermarks >> >> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <namluc.t...@euranova.eu> >> wrote: >> >>> Hello everyone, >>> >>> I am currently playing with streams which timestamp is defined by >>> EventTime. I currently have the following code: >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> DataStream<String> input = >>> env.readTextFile("file:///var/log/syslog"); >>> input.assignTimestampsAndWatermarks(new >>> AssignTimestampFromLogEvent()); >>> >>> input.timeWindowAll(Time.minutes(5)).apply(new >>> AllWindowFunction<Iterable<String>, String, TimeWindow>() { >>> @Override >>> public void apply(TimeWindow window, Iterable<String> values, >>> Collector<String> out) throws Exception { >>> for(String t:values) { >>> out.collect(t); >>> } >>> } >>> }).print(); >>> >>> (...) >>> >>> public static final class AssignTimestampFromLogEvent extends >>> AscendingTimestampExtractor<String> { >>> @Override >>> public long extractAscendingTimestamp(String element, long >>> previousElementTimestamp){ >>> String date = element.substring(0,15); >>> SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); >>> Date ddate = null; >>> try { >>> ddate = sdf.parse(date); >>> } catch (ParseException e) { >>> e.printStackTrace(); >>> } >>> return ddate.getTime(); >>> } >>> } >>> >>> >>> What I expect it to do is to read the syslog, assign timestamp and do >>> 5 minutes windows *based on the syslog event time*, as I've configured >>> the stream to do it. It however does not do that, and does the windows >>> based on processing time. >>> >>> What am I missing here? >>> >>> Best regards, >>> >>> -- >>> >>> *Nam-Luc TRAN* >>> >>> R&D Manager >>> >>> EURA NOVA >>> >>> (M) +32 498 37 36 23 >>> >>> *euranova.eu <http://euranova.eu>* >>> >> > > > > -- > > *Nam-Luc TRAN* > > R&D Manager > > EURA NOVA > > (M) +32 498 37 36 23 > > *euranova.eu <http://euranova.eu>*