Nice catch, actually. I think we should let the timestamp extracting operator emit the current watermark prior to shutting down.
On Fri, Feb 26, 2016 at 11:49 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I think the problem is that the source finished before the extractor has > the chance to emit even a single watermark. This means that the topology > will shut down and the window operator does not emit in-flight windows upon > shutdown. > > Cheers, > Aljoscha > > On 26 Feb 2016, at 11:40, Nam-Luc Tran <namluc.t...@euranova.eu> wrote: > > > > Great, that did it, thanks Robert ;) > > > > While I'm at it: > > Sometimes results are correctly returned, sometimes, the output of the > job > > (print or writeAsText) is plain empty, like the job finished too quickly > > before the results are written. One way of "forcing" results to happen is > > to insert a "delay" in the source stream, as with a FlatMap: > > > > @Override > > public void flatMap(String value, Collector<String> out) > > throws Exception { > > Thread.sleep(1); > > out.collect(value.toLowerCase()); > > } > > > > Am I missing anything here? > > > > Best regards, > > > > > > 2016-02-25 20:05 GMT+01:00 Robert Metzger <rmetz...@apache.org>: > > > >> Hi, > >> > >> I had a similar issue recently. > >> Instead of > >> input.assignTimestampsAndWatermarks > >> > >> you have to do: > >> > >> input = input.assignTimestampsAndWatermarks > >> > >> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <namluc.t...@euranova.eu> > >> wrote: > >> > >>> Hello everyone, > >>> > >>> I am currently playing with streams which timestamp is defined by > >>> EventTime. I currently have the following code: > >>> > >>> final StreamExecutionEnvironment env = > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>> > >>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); > >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>> > >>> DataStream<String> input = > >>> env.readTextFile("file:///var/log/syslog"); > >>> input.assignTimestampsAndWatermarks(new > >>> AssignTimestampFromLogEvent()); > >>> > >>> input.timeWindowAll(Time.minutes(5)).apply(new > >>> AllWindowFunction<Iterable<String>, String, TimeWindow>() { > >>> @Override > >>> public void apply(TimeWindow window, Iterable<String> values, > >>> Collector<String> out) throws Exception { > >>> for(String t:values) { > >>> out.collect(t); > >>> } > >>> } > >>> }).print(); > >>> > >>> (...) > >>> > >>> public static final class AssignTimestampFromLogEvent extends > >>> AscendingTimestampExtractor<String> { > >>> @Override > >>> public long extractAscendingTimestamp(String element, long > >>> previousElementTimestamp){ > >>> String date = element.substring(0,15); > >>> SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); > >>> Date ddate = null; > >>> try { > >>> ddate = sdf.parse(date); > >>> } catch (ParseException e) { > >>> e.printStackTrace(); > >>> } > >>> return ddate.getTime(); > >>> } > >>> } > >>> > >>> > >>> What I expect it to do is to read the syslog, assign timestamp and do > >>> 5 minutes windows *based on the syslog event time*, as I've configured > >>> the stream to do it. It however does not do that, and does the windows > >>> based on processing time. > >>> > >>> What am I missing here? > >>> > >>> Best regards, > >>> > >>> -- > >>> > >>> *Nam-Luc TRAN* > >>> > >>> R&D Manager > >>> > >>> EURA NOVA > >>> > >>> (M) +32 498 37 36 23 > >>> > >>> *euranova.eu <http://euranova.eu>* > >>> > >> > > > > > > > > -- > > > > *Nam-Luc TRAN* > > > > R&D Manager > > > > EURA NOVA > > > > (M) +32 498 37 36 23 > > > > *euranova.eu <http://euranova.eu>* > >