Hi Björn,

You configured an allowed lateness, so this might be caused by late
arriving data.
In case a late record is received within the allowed lateness, the window
function will fire again for the same window.
Does that also happen if you remove the allowedLateness() call?

Best, Fabian


2018-02-16 12:05 GMT+01:00 Björn Zachrisson <bjo...@gmail.com>:

> Hi,
>
> I have something weird probably a user error :)
>
> I'm running a keyby on multiple elements
>
> val keyedStream = nonKeyedStream
>   .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4))
>
> then apply a window function
>
> val appliedWindow = keyedStream
>   .timeWindow(minutes(WindowTimeMinutes))
>   .allowedLateness(minutes(WindowDelayMinutes))
>   .apply(new windowFunc1)
>
>
> This is my first action in apply function
>
> override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input: 
> Iterable[T4], out: Collector[T4]): Unit = {
>   myClass.addKey(key,window)
>
>
> And in this class i have a function
>
> object myClass{
>   val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]()
>
>
>   def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = {
>     if(keyHash.contains(key)){
>       printf("Multiple key found for: " + key + "\n")
>       printf(keyHash(key) + "\n")
>       printf(window + "\n")
>     }
>     keyHash.put(key,window)
>   }
> }
>
>
> This outputs
> Multiple key found for: (1,2,3,4)
> TimeWindow{start=1518268800000, end=1518270000000}
> TimeWindow{start=1518268800000, end=1518270000000}
>
>
> So it seems that the keyby is not doing what is is supposed to or I am
> messing things up somewhere but I cannot seem to find it.
>
> Regards
> Björn
>
>
>

Reply via email to