Hi Jing, Please try this way, Only create one sink for final output, write the window aggregate and topN in one query, write the result of topN into the final sink.
Best, Jing Zhang Jing <ajin...@gmail.com> 于2021年12月24日周五 03:13写道: > Hi Jing Zhang, > > Thanks for the reply! My current implementation is like this: > > > tableEnv.executeSql( > "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end > BIGINT, num_select BIGINT) WITH ('connector' = 'kafka', 'scan.startup.mode' > = 'latest-offset')" > ) > > tableEnv.executeSql(""" > |INSERT INTO ItemDesc > |SELECT > | item_id, > | channel_id, > | CAST(HOP_END(proctime, INTERVAL '15' SECOND, INTERVAL '60' > SECOND) AS BIGINT) AS window_end, > | COUNT(*) as num_select > |FROM mytable > |GROUP BY item_id, channel_id, HOP(proctime, INTERVAL '15' SECOND, > INTERVAL '60' SECOND) > """.stripMargin) > > val result = tableEnv.sqlQuery(""" > |SELECT roku_content_id, window_end, channel_id, num_select, row_num > |FROM ( > | SELECT * > | ROW_NUMBER() OVER (PARTITION BY channel_id ORDER BY num_select > DESC) as row_num > | FROM ItemDesc) > |WHERE row_num <= 20 > |""".stripMargin) > > But I got the error: > > org.apache.flink.table.api.ValidationException: Unable to create a sink > for writing table 'default_catalog.default_database.ItemDesc'. > > The table ItemDesc is an intermediate table. If I put everything in a > single query, that doesn't work. If I create a table like this: > > tableEnv.executeSql( > "CREATE TABLE ItemDesc (item_id STRING, channel_id STRING, window_end > BIGINT, num_select BIGINT) " > ) > > This also doesn't work. > > > Thanks, > Jing > > > > > On Thu, Dec 23, 2021 at 1:20 AM Jing Zhang <beyond1...@gmail.com> wrote: > >> Hi Jing, >> In fact, I agree with you to use TopN [2] instead of Window TopN[1] by >> normalizing >> time into a unit with 5 minute, and add it to be one of partition keys. >> Please note two points when use TopN >> 1. the result is an update stream instead of append stream, which means >> the result sent might be retracted later >> 2. you could take care of state clean. >> >> However you said you meet with a problem when use TopN. I didn't >> understand your question here. Would you please explain a little more? >> > > I saw the one possibility is to create a table and insert the >> aggregated data to the table, then do top N like [1]. However, I cannot >> make this approach work because I need to specify the connector for this >> table and I may also need to create another kafka topic for this. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/ >> [2] >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/ >> topn/ >> >> Jing Zhang <beyond1...@gmail.com> 于2021年12月23日周四 17:04写道: >> >>> Hi Jing, >>> I'm afraid there is no possible to Window TopN in SQL on 1.12 version >>> because window TopN is introduced since 1.13. >>> >>> > I saw the one possibility is to create a table and insert the >>> aggregated data to the table, then do top N like [1]. However, I cannot >>> make this approach work because I need to specify the connector for this >>> table and I may also need to create another kafka topic for this. >>> I didn't understand you here. >>> Do you mean you need a sink to store output data of TopN? However, you >>> still need a sink to store the output even you use Window TopN. >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-topn/ >>> >>> Best, >>> Jing Zhang >>> >>> >>> Jing <ajin...@gmail.com> 于2021年12月23日周四 16:12写道: >>> >>>> Hi, Flink community, >>>> >>>> Is there any existing code I can use to get the window top N with Flink >>>> 1.12? I saw the one possibility is to create a table and insert the >>>> aggregated data to the table, then do top N like [1]. However, I cannot >>>> make this approach work because I need to specify the connector for this >>>> table and I may also need to create another kafka topic for this. Is there >>>> any existing way to do the Window Top N with Flink 1.12? >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n >>>> >>>> >>>> Thanks, >>>> Jing >>>> >>>