Hi, the problem might be that your timestamp/watermark assigner is run in parallel and that only one parallel instance of those operators emits the watermark because only one of those parallel instances sees the element with _3 == 9000. For the watermark to advance at an operator it needs to advance in all upstream operations.
Cheers, Aljoscha On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <saiph.ka...@gmail.com> wrote: > Hi, > > I have a streaming (event time) application where I am receiving events > with the same assigned timestamp. I receive 10000 events in total on a > window of 5 minutes, but I emit water mark when 9000 elements have been > received. This watermark is 6 minutes after the assigned timestamps. My > question is: why the function that is associated with the window reads > 10000 elements and not 9000? All elements that have a timestamp lower than > the watermark should be ignored (1000), but it's not happening. > > Here is part of the code: > « > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val rawStream = env.socketTextStream("localhost", 4321) > > val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String, > Int, Long)] { > val timestamp = System.currentTimeMillis(); > > override def extractTimestamp(element: (String, Int, Long), > previousElementTimestamp: Long): Long = > timestamp > > override def checkAndGetNextWatermark(lastElement: (String, Int, > Long), extractedTimestamp: Long): Watermark = { > if(lastElement._3 == 9000) { > val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6) > new watermark.Watermark(ts) > } else null > } > } > > val stream = rawStream.map(line => { > val Array(p1, p2, p3) = line.split(" ") > (p1, p2.toInt, p3.toLong) > }) > .assignTimestampsAndWatermarks(punctuatedAssigner) > > stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function) > » > > Thanks! >