Hi Jing Zhang, Thanks for the reply! My current implementation is like this:
tableEnv.executeSql( "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end BIGINT, num_select BIGINT) WITH ('connector' = 'kafka', 'scan.startup.mode' = 'latest-offset')" ) tableEnv.executeSql(""" |INSERT INTO ItemDesc |SELECT | item_id, | channel_id, | CAST(HOP_END(proctime, INTERVAL '15' SECOND, INTERVAL '60' SECOND) AS BIGINT) AS window_end, | COUNT(*) as num_select |FROM mytable |GROUP BY item_id, channel_id, HOP(proctime, INTERVAL '15' SECOND, INTERVAL '60' SECOND) """.stripMargin) val result = tableEnv.sqlQuery(""" |SELECT roku_content_id, window_end, channel_id, num_select, row_num |FROM ( | SELECT * | ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY num_select DESC) as row_num | FROM ItemDesc) |WHERE row_num <= 20 |""".stripMargin) But I got the error: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.ItemDesc'. The table ItemDesc is an intermediate table. If I put everything in a single query, that doesn't work. If I create a table like this: tableEnv.executeSql( "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end BIGINT, num_select BIGINT) " ) This also doesn't work. Thanks, Jing On Thu, Dec 23, 2021 at 1:20 AM Jing Zhang <beyond1...@gmail.com> wrote: > Hi Jing, > In fact, I agree with you to use TopN [2] instead of Window TopN[1] by > normalizing > time into a unit with 5 minute, and add it to be one of partition keys. > Please note two points when use TopN > 1. the result is an update stream instead of append stream, which means > the result sent might be retracted later > 2. you could take care of state clean. > > However you said you meet with a problem when use TopN. I didn't > understand your question here. Would you please explain a little more? > > > I saw the one possibility is to create a table and insert the > aggregated data to the table, then do top N like [1]. However, I cannot > make this approach work because I need to specify the connector for this > table and I may also need to create another kafka topic for this. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/ > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/ > topn/ > > Jing Zhang <beyond1...@gmail.com> 于2021年12月23日周四 17:04写道: > >> Hi Jing, >> I'm afraid there is no possible to Window TopN in SQL on 1.12 version >> because window TopN is introduced since 1.13. >> >> > I saw the one possibility is to create a table and insert the >> aggregated data to the table, then do top N like [1]. However, I cannot >> make this approach work because I need to specify the connector for this >> table and I may also need to create another kafka topic for this. >> I didn't understand you here. >> Do you mean you need a sink to store output data of TopN? However, you >> still need a sink to store the output even you use Window TopN. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/ >> >> Best, >> Jing Zhang >> >> >> Jing <ajin...@gmail.com> 于2021年12月23日周四 16:12写道: >> >>> Hi, Flink community, >>> >>> Is there any existing code I can use to get the window top N with Flink >>> 1.12? I saw the one possibility is to create a table and insert the >>> aggregated data to the table, then do top N like [1]. However, I cannot >>> make this approach work because I need to specify the connector for this >>> table and I may also need to create another kafka topic for this. Is there >>> any existing way to do the Window Top N with Flink 1.12? >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n >>> >>> >>> Thanks, >>> Jing >>> >>