Hi, I have something weird probably a user error :)
I'm running a keyby on multiple elements val keyedStream = nonKeyedStream .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4)) then apply a window function val appliedWindow = keyedStream .timeWindow(minutes(WindowTimeMinutes)) .allowedLateness(minutes(WindowDelayMinutes)) .apply(new windowFunc1) This is my first action in apply function override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input: Iterable[T4], out: Collector[T4]): Unit = { myClass.addKey(key,window) And in this class i have a function object myClass{ val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]() def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = { if(keyHash.contains(key)){ printf("Multiple key found for: " + key + "\n") printf(keyHash(key) + "\n") printf(window + "\n") } keyHash.put(key,window) } } This outputs Multiple key found for: (1,2,3,4) TimeWindow{start=1518268800000, end=1518270000000} TimeWindow{start=1518268800000, end=1518270000000} So it seems that the keyby is not doing what is is supposed to or I am messing things up somewhere but I cannot seem to find it. Regards Björn