I think the issue here is that the nested select is selecting all the fields produced by the TVF, including window_time (which is implicitly added by the TVF as described here <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#window-functions>). Because of that, the planner cannot resolve the timestamp to use as event-time in the result stream. Try to select only the fields you need in the nested SELECT, e.g.:
SELECT * FROM ( SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, user_id ORDER BY ts ASC) as rownum FROM TABLE( TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) ) WHERE rownum = 1 On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov <ebonfortr...@gmail.com> wrote: > I'm trying to express a supposedly simple query with Flink SQL - log the > first visit a day for each user. Source table is defined like > > CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS ts) > WITH ('connector' = 'filesystem', > 'path' = 'file:///visits.csv', > 'format' = 'csv') > > The query I came with up is > > SELECT * > FROM ( > SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, > user_id ORDER BY ts ASC) as rownum > FROM TABLE( > TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) > ) > WHERE rownum = 1 > > But it fails with error > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: Found more than one rowtime > field: [ts, window_time] in the query when insert into > 'default_catalog.default_database.Unregistered_Collect_Sink_6'. > Please select the rowtime field that should be used as event-time > timestamp for the DataStream by casting all other fields to TIMESTAMP. > > Any ideas on how to fix this? >