Hi Jing Zhang,

Thanks for the reply! My current implementation is like this:


tableEnv.executeSql(
  "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
BIGINT, num_select BIGINT) WITH ('connector' = 'kafka', 'scan.startup.mode'
= 'latest-offset')"
)

tableEnv.executeSql("""
        |INSERT INTO ItemDesc
        |SELECT
        |   item_id,
        |   channel_id,
        |   CAST(HOP_END(proctime, INTERVAL '15' SECOND, INTERVAL '60'
SECOND) AS BIGINT) AS window_end,
        |   COUNT(*) as num_select
        |FROM mytable
        |GROUP BY item_id, channel_id, HOP(proctime, INTERVAL '15' SECOND,
INTERVAL '60' SECOND)
          """.stripMargin)

val result = tableEnv.sqlQuery("""
    |SELECT roku_content_id, window_end, channel_id, num_select, row_num
    |FROM (
    |   SELECT *
    |      ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY num_select
DESC) as row_num
    |   FROM ItemDesc)
    |WHERE row_num <= 20
    |""".stripMargin)

But I got the error:

org.apache.flink.table.api.ValidationException: Unable to create a sink for
writing table 'default_catalog.default_database.ItemDesc'.

The table ItemDesc is an intermediate table. If I put everything in a
single query, that doesn't work. If I create a table like this:

tableEnv.executeSql(
  "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end
BIGINT, num_select BIGINT) "
)

This also doesn't work.


Thanks,
Jing




On Thu, Dec 23, 2021 at 1:20 AM Jing Zhang <beyond1...@gmail.com> wrote:

> Hi Jing,
> In fact, I agree with you to use TopN [2] instead of Window TopN[1] by 
> normalizing
> time into a unit with 5 minute, and add it to be one of partition keys.
> Please note two points when use TopN
> 1. the result is an update stream instead of append stream, which means
> the result sent might be retracted later
> 2. you could take care of state clean.
>
> However you said you meet with a problem when use TopN. I didn't
> understand your question here. Would you please explain a little more?
> > > I saw the one possibility is to create a table and insert the
> aggregated data to the table, then do top N like [1]. However, I cannot
> make this approach work because I need to specify the connector for this
> table and I may also need to create another kafka topic for this.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/
> topn/
>
> Jing Zhang <beyond1...@gmail.com> 于2021年12月23日周四 17:04写道:
>
>> Hi Jing,
>> I'm afraid there is no possible to Window TopN in SQL on 1.12 version
>> because window TopN is introduced since 1.13.
>>
>> > I saw the one possibility is to create a table and insert the
>> aggregated data to the table, then do top N like [1]. However, I cannot
>> make this approach work because I need to specify the connector for this
>> table and I may also need to create another kafka topic for this.
>> I didn't understand you here.
>> Do you mean you need a sink to store output data of TopN? However, you
>> still need a sink to store the output even you use Window TopN.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/
>>
>> Best,
>> Jing Zhang
>>
>>
>> Jing <ajin...@gmail.com> 于2021年12月23日周四 16:12写道:
>>
>>> Hi, Flink community,
>>>
>>> Is there any existing code I can use to get the window top N with Flink
>>> 1.12? I saw the one possibility is to create a table and insert the
>>> aggregated data to the table, then do top N like [1]. However, I cannot
>>> make this approach work because I need to specify the connector for this
>>> table and I may also need to create another kafka topic for this. Is there
>>> any existing way to do the Window Top N with Flink 1.12?
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
>>>
>>>
>>> Thanks,
>>> Jing
>>>
>>

Reply via email to