Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. 
It means other fields will be kept as the max or min element. This is quite 
clear. However, when I use max or min, how do Flink do on other fields?


val tupleStream = senv.fromElements(
  (0, 0, 0), (0, 1, 1), (0, 2, 2),
  (1, 0, 6), (1, 1, 7), (1, 2, 8)
)
//  (0,0,0)
//  (0,0,1)
//  (0,0,2)
//  (1,0,6)
//  (1,0,7)
//  (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()

In this case, the second field use the first element's 0.


class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

  var isRunning: Boolean = true
  var i = 0

  val rand = new Random()

  override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

    while (isRunning) {

      // 将数据源收集写入SourceContext
      srcCtx.collect((0, i, i))
      i += 1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
  .keyBy(0)
  .timeWindow(Time.milliseconds(2000))
  .max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other 
fields ?

Thank you so much if anyone can replay.

Weizheng

Reply via email to