Hello everyone,

I am currently playing with streams which timestamp is defined by
EventTime. I currently have the following code:

      final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
      env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

      DataStream<String> input = env.readTextFile("file:///var/log/syslog");
      input.assignTimestampsAndWatermarks(new AssignTimestampFromLogEvent());

      input.timeWindowAll(Time.minutes(5)).apply(new
AllWindowFunction<Iterable<String>, String, TimeWindow>() {
         @Override
         public void apply(TimeWindow window, Iterable<String> values,
Collector<String> out) throws Exception {
            for(String t:values) {
               out.collect(t);
            }
         }
      }).print();

(...)

public static final class AssignTimestampFromLogEvent extends
AscendingTimestampExtractor<String> {
   @Override
   public long extractAscendingTimestamp(String element, long
previousElementTimestamp){
      String date = element.substring(0,15);
      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
      Date ddate = null;
      try {
         ddate = sdf.parse(date);
      } catch (ParseException e) {
         e.printStackTrace();
      }
      return ddate.getTime();
   }
}


What I expect it to do is to read the syslog, assign timestamp and do
5 minutes windows *based on the syslog event time*, as I've configured
the stream to do it. It however does not do that, and does the windows
based on processing time.

What am I missing here?

Best regards,

-- 

*Nam-Luc TRAN*

R&D Manager

EURA NOVA

(M) +32 498 37 36 23

*euranova.eu <http://euranova.eu>*

Reply via email to