Nice catch, actually.

I think we should let the timestamp extracting operator emit the current
watermark prior to shutting down.

On Fri, Feb 26, 2016 at 11:49 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> I think the problem is that the source finished before the extractor has
> the chance to emit even a single watermark. This means that the topology
> will shut down and the window operator does not emit in-flight windows upon
> shutdown.
>
> Cheers,
> Aljoscha
> > On 26 Feb 2016, at 11:40, Nam-Luc Tran <namluc.t...@euranova.eu> wrote:
> >
> > Great, that did it, thanks Robert ;)
> >
> > While I'm at it:
> > Sometimes results are correctly returned, sometimes, the output of the
> job
> > (print or writeAsText)  is plain empty, like the job finished too quickly
> > before the results are written. One way of "forcing" results to happen is
> > to insert a "delay" in the source stream, as with a FlatMap:
> >
> >      @Override
> >      public void flatMap(String value, Collector<String> out)
> >            throws Exception {
> >         Thread.sleep(1);
> >         out.collect(value.toLowerCase());
> >         }
> >
> > Am I missing anything here?
> >
> > Best regards,
> >
> >
> > 2016-02-25 20:05 GMT+01:00 Robert Metzger <rmetz...@apache.org>:
> >
> >> Hi,
> >>
> >> I had a similar issue recently.
> >> Instead of
> >> input.assignTimestampsAndWatermarks
> >>
> >> you have to do:
> >>
> >> input = input.assignTimestampsAndWatermarks
> >>
> >> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <namluc.t...@euranova.eu>
> >> wrote:
> >>
> >>> Hello everyone,
> >>>
> >>> I am currently playing with streams which timestamp is defined by
> >>> EventTime. I currently have the following code:
> >>>
> >>>      final StreamExecutionEnvironment env =
> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>>
> >>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000);
> >>>      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>>
> >>>      DataStream<String> input =
> >>> env.readTextFile("file:///var/log/syslog");
> >>>      input.assignTimestampsAndWatermarks(new
> >>> AssignTimestampFromLogEvent());
> >>>
> >>>      input.timeWindowAll(Time.minutes(5)).apply(new
> >>> AllWindowFunction<Iterable<String>, String, TimeWindow>() {
> >>>         @Override
> >>>         public void apply(TimeWindow window, Iterable<String> values,
> >>> Collector<String> out) throws Exception {
> >>>            for(String t:values) {
> >>>               out.collect(t);
> >>>            }
> >>>         }
> >>>      }).print();
> >>>
> >>> (...)
> >>>
> >>> public static final class AssignTimestampFromLogEvent extends
> >>> AscendingTimestampExtractor<String> {
> >>>   @Override
> >>>   public long extractAscendingTimestamp(String element, long
> >>> previousElementTimestamp){
> >>>      String date = element.substring(0,15);
> >>>      SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss");
> >>>      Date ddate = null;
> >>>      try {
> >>>         ddate = sdf.parse(date);
> >>>      } catch (ParseException e) {
> >>>         e.printStackTrace();
> >>>      }
> >>>      return ddate.getTime();
> >>>   }
> >>> }
> >>>
> >>>
> >>> What I expect it to do is to read the syslog, assign timestamp and do
> >>> 5 minutes windows *based on the syslog event time*, as I've configured
> >>> the stream to do it. It however does not do that, and does the windows
> >>> based on processing time.
> >>>
> >>> What am I missing here?
> >>>
> >>> Best regards,
> >>>
> >>> --
> >>>
> >>> *Nam-Luc TRAN*
> >>>
> >>> R&D Manager
> >>>
> >>> EURA NOVA
> >>>
> >>> (M) +32 498 37 36 23
> >>>
> >>> *euranova.eu <http://euranova.eu>*
> >>>
> >>
> >
> >
> >
> > --
> >
> > *Nam-Luc TRAN*
> >
> > R&D Manager
> >
> > EURA NOVA
> >
> > (M) +32 498 37 36 23
> >
> > *euranova.eu <http://euranova.eu>*
>
>

Reply via email to