Hi,
the problem might be that your timestamp/watermark assigner is run in
parallel and that only one parallel instance of those operators emits the
watermark because only one of those parallel instances sees the element
with _3 == 9000. For the watermark to advance at an operator it needs to
advance in all upstream operations.

Cheers,
Aljoscha

On Fri, 9 Sep 2016 at 18:29 Saiph Kappa <saiph.ka...@gmail.com> wrote:

> Hi,
>
> I have a streaming (event time) application where I am receiving events
> with the same assigned timestamp. I receive 10000 events in total on a
> window of 5 minutes, but I emit water mark when 9000 elements have been
> received. This watermark is 6 minutes after the assigned timestamps. My
> question is: why the function that is associated with the window reads
> 10000 elements and not 9000? All elements that have a timestamp lower than
> the watermark should be ignored (1000), but it's not happening.
>
> Here is part of the code:
> «
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val rawStream = env.socketTextStream("localhost", 4321)
>
> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
> Int, Long)] {
>       val timestamp = System.currentTimeMillis();
>
>       override def extractTimestamp(element: (String, Int, Long),
> previousElementTimestamp: Long): Long =
>         timestamp
>
>       override def checkAndGetNextWatermark(lastElement: (String, Int,
> Long), extractedTimestamp: Long): Watermark = {
>         if(lastElement._3 == 9000) {
>           val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>           new watermark.Watermark(ts)
>         } else null
>       }
>     }
>
> val stream = rawStream.map(line => {
>       val Array(p1, p2, p3) = line.split(" ")
>       (p1, p2.toInt, p3.toLong)
>     })
>       .assignTimestampsAndWatermarks(punctuatedAssigner)
>
> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
> »
>
> Thanks!
>

Reply via email to