Hi Pavel, There's a Flink SQL recipe in the Flink SQL Cookbook for a Window TopN, see https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/11_window_top_n/11_window_top_n.md. I think that could help you with your use case too.
Best regards, Martijn On Thu, 4 Nov 2021 at 12:42, Pavel Penkov <ebonfortr...@gmail.com> wrote: > When the query changed to > > SELECT user_id, ts, rownum > FROM ( > SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start, > window_end, user_id ORDER BY ts ASC) as rownum > FROM TABLE( > TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) > ) > WHERE rownum = 1 > > runs but doesn't produce any results. I've tried different window sizes, > the source file is sorted by timestamp. > > On Thu, Nov 4, 2021 at 1:42 PM Francesco Guardiani < > france...@ververica.com> wrote: > >> I think the issue here is that the nested select is selecting all the >> fields produced by the TVF, including window_time (which is implicitly >> added by the TVF as described here >> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#window-functions>). >> Because of that, the planner cannot resolve the timestamp to use as >> event-time in the result stream. Try to select only the fields you need in >> the nested SELECT, e.g.: >> >> SELECT * >> FROM ( >> SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start, >> window_end, user_id ORDER BY ts ASC) as rownum >> FROM TABLE( >> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) >> ) >> WHERE rownum = 1 >> >> On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov <ebonfortr...@gmail.com> >> wrote: >> >>> I'm trying to express a supposedly simple query with Flink SQL - log the >>> first visit a day for each user. Source table is defined like >>> >>> CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS >>> ts) >>> WITH ('connector' = 'filesystem', >>> 'path' = 'file:///visits.csv', >>> 'format' = 'csv') >>> >>> The query I came with up is >>> >>> SELECT * >>> FROM ( >>> SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, >>> user_id ORDER BY ts ASC) as rownum >>> FROM TABLE( >>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) >>> ) >>> WHERE rownum = 1 >>> >>> But it fails with error >>> [ERROR] Could not execute SQL statement. Reason: >>> org.apache.flink.table.api.TableException: Found more than one rowtime >>> field: [ts, window_time] in the query when insert into >>> 'default_catalog.default_database.Unregistered_Collect_Sink_6'. >>> Please select the rowtime field that should be used as event-time >>> timestamp for the DataStream by casting all other fields to TIMESTAMP. >>> >>> Any ideas on how to fix this? >>> >>