Hi Jing,

Thanks for the advice. This is very helpful.

-Guoqin

On Wed, Dec 8, 2021 at 11:52 PM Jing Zhang <beyond1...@gmail.com> wrote:

> Hi Guoqin,
> I understand the problem you are suffering.
> I'm sorry I could not find out a perfect solution on Flink 1.13.
>
> Maybe you could try to use TopN [1] instead of Window TopN by normalizing
> time into a unit with 5 minute, and add it to be one of partition keys.
> But the result is an update stream instead of append stream, which means
> the result sent might be retracted later. Besides, you could take care of
> state clean.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/topn/
>
> Best,
> Jing Zhang
>
> Guoqin Zheng <lanson.zh...@gmail.com> 于2021年12月9日周四 14:16写道:
>
>> Hi Jing,
>>
>> Just verified that it worked with Flink 1.14. But as you said, Flink 1.13
>> does not yet support it.
>> Other than waiting for KDA to upgrade the Flink version, is there any
>> workaround for Flink 1.13?
>>
>> Thanks,
>> -Guoqin
>>
>> On Wed, Dec 8, 2021 at 10:00 PM Guoqin Zheng <lanson.zh...@gmail.com>
>> wrote:
>>
>>> Hi Jing,
>>>
>>> Thanks for chiming in. This sounds great. Any chance this will work for
>>> Flink 1.13 as well, as I am using AWS KDA.
>>>
>>> Thanks,
>>> -Guoqin
>>>
>>> On Wed, Dec 8, 2021 at 7:47 PM Jing Zhang <beyond1...@gmail.com> wrote:
>>>
>>>> Hi Guoqin,
>>>> I guess you have misunderstood Martijn's response.
>>>> Martijn suggest you use Window TopN. Besides, Window TopN does not need
>>>> to follow a Window Aggregate, it could followed with Window TVF directly
>>>> since Flink 1.14. Please see document [1] attached.
>>>> You could try the following SQL to get the record with the max gauge
>>>> because you use ORDER BY gauge desc and fetch the first one.
>>>>
>>>> SELECT deviceId, locationId, gauge, window_start, window_end
>>>>   FROM (
>>>>     SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end 
>>>> ORDER BY gauge DESC) as rownum
>>>>     FROM TABLE(
>>>>                TUMBLE(TABLE MyTable, DESCRIPTOR(readtime), INTERVAL '5' 
>>>> MINUTES))
>>>>   ) WHERE rownum <= 1;
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-topn/#window-top-n-follows-after-windowing-tvf
>>>>
>>>> Best,
>>>> Jing Zhang
>>>>
>>>>
>>>>
>>>>
>>>> Guoqin Zheng <lanson.zh...@gmail.com> 于2021年12月9日周四 10:30写道:
>>>>
>>>>> Hi Martijn,
>>>>>
>>>>> Thanks for your quick response. I tried it, but it does not seem to
>>>>> work.
>>>>>
>>>>> The problem is that I want to select fields that are not in the `GROUP
>>>>> BY`. So in my example, I can have a tumble window on `readtime`, and 
>>>>> select
>>>>> max(gauge), but I also want both `deviceId` and `locationId` of the max
>>>>> record included in the selected result. With Top-N, it does not seem to
>>>>> allow that.
>>>>>
>>>>> -Guoqin
>>>>>
>>>>> On Wed, Dec 8, 2021 at 1:22 PM Martijn Visser <mart...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Guoqin,
>>>>>>
>>>>>> I think you could use the Window Top-N. There's a recipe in the Flink
>>>>>> SQL Cookbook [1]. The example uses a SUM which you should change to MAX 
>>>>>> and
>>>>>> of course you change the rownum to 1 instead of 3.
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Martijn
>>>>>>
>>>>>> [1]
>>>>>>
>>>>>> https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/11_window_top_n/11_window_top_n.md
>>>>>>
>>>>>> Op wo 8 dec. 2021 om 19:54 schreef Guoqin Zheng <
>>>>>> lanson.zh...@gmail.com>
>>>>>>
>>>>>>> Hi Flink Community,
>>>>>>>
>>>>>>> I am curious what the recommended way is to select the event with a
>>>>>>> max attribute value with SQL api.
>>>>>>>
>>>>>>> For example, I have an event stream like:
>>>>>>>
>>>>>>> {
>>>>>>>    deviceId,
>>>>>>>    locationId
>>>>>>>    gauge,
>>>>>>>    readtime,  <-- eventTime
>>>>>>> }
>>>>>>>
>>>>>>> I want to figure out which device and location has the max gauge
>>>>>>> over a 5-mins window.
>>>>>>>
>>>>>>> Any advice would be greatly appreciated!
>>>>>>>
>>>>>>> Thanks!
>>>>>>> -Guoqin
>>>>>>>
>>>>>> --
>>>>>>
>>>>>> Martijn Visser | Product Manager
>>>>>>
>>>>>> mart...@ververica.com
>>>>>>
>>>>>> <https://www.ververica.com/>
>>>>>>
>>>>>>
>>>>>> Follow us @VervericaData
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>> Conference
>>>>>>
>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>
>>>>>>

Reply via email to