Hello everyone,
I am currently playing with streams which timestamp is defined by
EventTime. I currently have the following code:
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> input = env.readTextFile("file:///var/log/syslog");
input.assignTimestampsAndWatermarks(new AssignTimestampFromLogEvent());
input.timeWindowAll(Time.minutes(5)).apply(new
AllWindowFunction<Iterable<String>, String, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<String> values,
Collector<String> out) throws Exception {
for(String t:values) {
out.collect(t);
}
}
}).print();
(...)
public static final class AssignTimestampFromLogEvent extends
AscendingTimestampExtractor<String> {
@Override
public long extractAscendingTimestamp(String element, long
previousElementTimestamp){
String date = element.substring(0,15);
SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
Date ddate = null;
try {
ddate = sdf.parse(date);
} catch (ParseException e) {
e.printStackTrace();
}
return ddate.getTime();
}
}
What I expect it to do is to read the syslog, assign timestamp and do
5 minutes windows *based on the syslog event time*, as I've configured
the stream to do it. It however does not do that, and does the windows
based on processing time.
What am I missing here?
Best regards,
--
*Nam-Luc TRAN*
R&D Manager
EURA NOVA
(M) +32 498 37 36 23
*euranova.eu <http://euranova.eu>*