I'm trying to express a supposedly simple query with Flink SQL - log the
first visit a day for each user. Source table is defined like

CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS ts)
WITH ('connector' = 'filesystem',
'path' = 'file:///visits.csv',
'format' = 'csv')

The query I came with up is

SELECT *
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,
user_id ORDER BY ts ASC) as rownum
  FROM TABLE(
    TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
)
WHERE rownum = 1

But it fails with error
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Found more than one rowtime
field: [ts, window_time] in the query when insert into
'default_catalog.default_database.Unregistered_Collect_Sink_6'.
Please select the rowtime field that should be used as event-time timestamp
for the DataStream by casting all other fields to TIMESTAMP.

Any ideas on how to fix this?

Reply via email to