Yes, you can do that. You would have to define a custom trigger. Alternatively, you can also generate more conservative watermarks. That would have the same effect.
Best, Fabian 2018-02-16 12:25 GMT+01:00 Björn Zachrisson <bjo...@gmail.com>: > Hi Fabian, > > It does not since my events are out of order within a certain interval and > removing allowedLateness reduces the elements processed with 99.5%. > Is it possible to trigger the window first when the allowed latness value > has been passed? > > > Regards > Björn Zachrisson > > On 16 February 2018 at 12:17, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Björn, >> >> You configured an allowed lateness, so this might be caused by late >> arriving data. >> In case a late record is received within the allowed lateness, the window >> function will fire again for the same window. >> Does that also happen if you remove the allowedLateness() call? >> >> Best, Fabian >> >> >> 2018-02-16 12:05 GMT+01:00 Björn Zachrisson <bjo...@gmail.com>: >> >>> Hi, >>> >>> I have something weird probably a user error :) >>> >>> I'm running a keyby on multiple elements >>> >>> val keyedStream = nonKeyedStream >>> .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4)) >>> >>> then apply a window function >>> >>> val appliedWindow = keyedStream >>> .timeWindow(minutes(WindowTimeMinutes)) >>> .allowedLateness(minutes(WindowDelayMinutes)) >>> .apply(new windowFunc1) >>> >>> >>> This is my first action in apply function >>> >>> override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input: >>> Iterable[T4], out: Collector[T4]): Unit = { >>> myClass.addKey(key,window) >>> >>> >>> And in this class i have a function >>> >>> object myClass{ >>> val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]() >>> >>> >>> def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = { >>> if(keyHash.contains(key)){ >>> printf("Multiple key found for: " + key + "\n") >>> printf(keyHash(key) + "\n") >>> printf(window + "\n") >>> } >>> keyHash.put(key,window) >>> } >>> } >>> >>> >>> This outputs >>> Multiple key found for: (1,2,3,4) >>> TimeWindow{start=1518268800000, end=1518270000000} >>> TimeWindow{start=1518268800000, end=1518270000000} >>> >>> >>> So it seems that the keyby is not doing what is is supposed to or I am >>> messing things up somewhere but I cannot seem to find it. >>> >>> Regards >>> Björn >>> >>> >>> >> >