Hi Lu,

@vino yang <yanghua1...@gmail.com> I think what he means is that the "max"
semantics between window and non-window are different. It changes
non-aggregated fields unpredictably.

That's really an interesting question.

I take a look at the relevant implementation. From the perspective of
codes, "max" always keeps the non-aggregated fields with the value of first
arrived record, which should be (0, 0, x) in this case. However when the
window is purged, the state (which keeps non-aggregated fields of first
arrived record and the maximum field) will be cleared. That means the
"first arrived record" will always be reset when a window is purged. That's
why the second column increases unpredictably.

The semantics here is so confused to me.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 17:50, vino yang <yanghua1...@gmail.com> wrote:

> Hi weizheng,
>
> IMHO, I do not know where is not clear to you? Is the result not correct?
> Can you share the correct result based on your understanding?
>
> The "keyBy" specifies group field and min/max do the aggregation in the
> other field based on the position you specified.
>
> Best,
> Vino
>
> Lu Weizheng <luweizhen...@hotmail.com> 于2019年12月19日周四 下午5:00写道:
>
>> Hi all,
>>
>> On a KeyedStream, when I use maxBy or minBy, I will get the max or min
>> element. It means other fields will be kept as the max or min element. This
>> is quite clear. However, when I use max or min, how do Flink do on other
>> fields?
>>
>> val tupleStream = senv.fromElements(
>>   (0, 0, 0), (0, 1, 1), (0, 2, 2),
>>   (1, 0, 6), (1, 1, 7), (1, 2, 8)
>> )
>> //  (0,0,0)
>> //  (0,0,1)
>> //  (0,0,2)
>> //  (1,0,6)
>> //  (1,0,7)
>> //  (1,0,8)
>> val maxByStream = tupleStream.keyBy(0).max(2).print()
>>
>> In this case, the second field use the first element's 0.
>>
>> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{
>>
>>   var isRunning: Boolean = true
>>   var i = 0
>>
>>   val rand = new Random()
>>
>>   override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {
>>
>>     while (isRunning) {
>>
>>       // 将数据源收集写入SourceContext
>>       srcCtx.collect((0, i, i))
>>       i += 1
>>       Thread.sleep(1000)
>>     }
>>   }
>>
>>   override def cancel(): Unit = {
>>     isRunning = false
>>   }
>> }
>>
>> //(0,0,0)
>> //(0,1,2)
>> //(0,3,4)
>> //(0,5,6)
>> //(0,7,8)
>> //(0,9,10)
>>
>> val maxWindowStream = senv.addSource(new IntTupleSource)
>>   .keyBy(0)
>>   .timeWindow(Time.milliseconds(2000))
>>   .max(2).print()
>>
>>
>>
>> In this case, the result is not so clear...
>>
>> So, for max and min, the two operator can not make sure the result of
>> other fields ?
>>
>> Thank you so much if anyone can replay.
>>
>> Weizheng
>>
>

Reply via email to