Hi,

I had a similar issue recently.
Instead of
 input.assignTimestampsAndWatermarks

you have to do:

 input = input.assignTimestampsAndWatermarks

On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <namluc.t...@euranova.eu>
wrote:

> Hello everyone,
>
> I am currently playing with streams which timestamp is defined by
> EventTime. I currently have the following code:
>
>       final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
>       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>       DataStream<String> input =
> env.readTextFile("file:///var/log/syslog");
>       input.assignTimestampsAndWatermarks(new
> AssignTimestampFromLogEvent());
>
>       input.timeWindowAll(Time.minutes(5)).apply(new
> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
>          @Override
>          public void apply(TimeWindow window, Iterable<String> values,
> Collector<String> out) throws Exception {
>             for(String t:values) {
>                out.collect(t);
>             }
>          }
>       }).print();
>
> (...)
>
> public static final class AssignTimestampFromLogEvent extends
> AscendingTimestampExtractor<String> {
>    @Override
>    public long extractAscendingTimestamp(String element, long
> previousElementTimestamp){
>       String date = element.substring(0,15);
>       SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
>       Date ddate = null;
>       try {
>          ddate = sdf.parse(date);
>       } catch (ParseException e) {
>          e.printStackTrace();
>       }
>       return ddate.getTime();
>    }
> }
>
>
> What I expect it to do is to read the syslog, assign timestamp and do
> 5 minutes windows *based on the syslog event time*, as I've configured
> the stream to do it. It however does not do that, and does the windows
> based on processing time.
>
> What am I missing here?
>
> Best regards,
>
> --
>
> *Nam-Luc TRAN*
>
> R&D Manager
>
> EURA NOVA
>
> (M) +32 498 37 36 23
>
> *euranova.eu <http://euranova.eu>*
>

Reply via email to