Hi Lu, @vino yang <yanghua1...@gmail.com> I think what he means is that the "max" semantics between window and non-window are different. It changes non-aggregated fields unpredictably.
That's really an interesting question. I take a look at the relevant implementation. From the perspective of codes, "max" always keeps the non-aggregated fields with the value of first arrived record, which should be (0, 0, x) in this case. However when the window is purged, the state (which keeps non-aggregated fields of first arrived record and the maximum field) will be cleared. That means the "first arrived record" will always be reset when a window is purged. That's why the second column increases unpredictably. The semantics here is so confused to me. Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Dec 2019 at 17:50, vino yang <yanghua1...@gmail.com> wrote: > Hi weizheng, > > IMHO, I do not know where is not clear to you? Is the result not correct? > Can you share the correct result based on your understanding? > > The "keyBy" specifies group field and min/max do the aggregation in the > other field based on the position you specified. > > Best, > Vino > > Lu Weizheng <luweizhen...@hotmail.com> 于2019年12月19日周四 下午5:00写道: > >> Hi all, >> >> On a KeyedStream, when I use maxBy or minBy, I will get the max or min >> element. It means other fields will be kept as the max or min element. This >> is quite clear. However, when I use max or min, how do Flink do on other >> fields? >> >> val tupleStream = senv.fromElements( >> (0, 0, 0), (0, 1, 1), (0, 2, 2), >> (1, 0, 6), (1, 1, 7), (1, 2, 8) >> ) >> // (0,0,0) >> // (0,0,1) >> // (0,0,2) >> // (1,0,6) >> // (1,0,7) >> // (1,0,8) >> val maxByStream = tupleStream.keyBy(0).max(2).print() >> >> In this case, the second field use the first element's 0. >> >> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{ >> >> var isRunning: Boolean = true >> var i = 0 >> >> val rand = new Random() >> >> override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = { >> >> while (isRunning) { >> >> // 将数据源收集写入SourceContext >> srcCtx.collect((0, i, i)) >> i += 1 >> Thread.sleep(1000) >> } >> } >> >> override def cancel(): Unit = { >> isRunning = false >> } >> } >> >> //(0,0,0) >> //(0,1,2) >> //(0,3,4) >> //(0,5,6) >> //(0,7,8) >> //(0,9,10) >> >> val maxWindowStream = senv.addSource(new IntTupleSource) >> .keyBy(0) >> .timeWindow(Time.milliseconds(2000)) >> .max(2).print() >> >> >> >> In this case, the result is not so clear... >> >> So, for max and min, the two operator can not make sure the result of >> other fields ? >> >> Thank you so much if anyone can replay. >> >> Weizheng >> >