Hi,

I have something weird probably a user error :)

I'm running a keyby on multiple elements

val keyedStream = nonKeyedStream
  .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4))

then apply a window function

val appliedWindow = keyedStream
  .timeWindow(minutes(WindowTimeMinutes))
  .allowedLateness(minutes(WindowDelayMinutes))
  .apply(new windowFunc1)


This is my first action in apply function

override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input:
Iterable[T4], out: Collector[T4]): Unit = {
  myClass.addKey(key,window)


And in this class i have a function

object myClass{
  val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]()


  def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = {
    if(keyHash.contains(key)){
      printf("Multiple key found for: " + key + "\n")
      printf(keyHash(key) + "\n")
      printf(window + "\n")
    }
    keyHash.put(key,window)
  }
}


This outputs
Multiple key found for: (1,2,3,4)
TimeWindow{start=1518268800000, end=1518270000000}
TimeWindow{start=1518268800000, end=1518270000000}


So it seems that the keyby is not doing what is is supposed to or I am
messing things up somewhere but I cannot seem to find it.

Regards
Björn

Reply via email to