Hello everyone, I am currently playing with streams which timestamp is defined by EventTime. I currently have the following code:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> input = env.readTextFile("file:///var/log/syslog"); input.assignTimestampsAndWatermarks(new AssignTimestampFromLogEvent()); input.timeWindowAll(Time.minutes(5)).apply(new AllWindowFunction<Iterable<String>, String, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<String> values, Collector<String> out) throws Exception { for(String t:values) { out.collect(t); } } }).print(); (...) public static final class AssignTimestampFromLogEvent extends AscendingTimestampExtractor<String> { @Override public long extractAscendingTimestamp(String element, long previousElementTimestamp){ String date = element.substring(0,15); SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); Date ddate = null; try { ddate = sdf.parse(date); } catch (ParseException e) { e.printStackTrace(); } return ddate.getTime(); } } What I expect it to do is to read the syslog, assign timestamp and do 5 minutes windows *based on the syslog event time*, as I've configured the stream to do it. It however does not do that, and does the windows based on processing time. What am I missing here? Best regards, -- *Nam-Luc TRAN* R&D Manager EURA NOVA (M) +32 498 37 36 23 *euranova.eu <http://euranova.eu>*