As a rule of thumb, I would first try to check that Flink ingests correctly your csv. Perhaps try to run just a select on your input and see if the input is parsed as expected and is ordered.
On Thu, Nov 4, 2021 at 12:47 PM Martijn Visser <mart...@ververica.com> wrote: > Hi Pavel, > > There's a Flink SQL recipe in the Flink SQL Cookbook for a Window TopN, > see > https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/11_window_top_n/11_window_top_n.md. > I think that could help you with your use case too. > > Best regards, > > Martijn > > On Thu, 4 Nov 2021 at 12:42, Pavel Penkov <ebonfortr...@gmail.com> wrote: > >> When the query changed to >> >> SELECT user_id, ts, rownum >> FROM ( >> SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start, >> window_end, user_id ORDER BY ts ASC) as rownum >> FROM TABLE( >> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) >> ) >> WHERE rownum = 1 >> >> runs but doesn't produce any results. I've tried different window sizes, >> the source file is sorted by timestamp. >> >> On Thu, Nov 4, 2021 at 1:42 PM Francesco Guardiani < >> france...@ververica.com> wrote: >> >>> I think the issue here is that the nested select is selecting all the >>> fields produced by the TVF, including window_time (which is implicitly >>> added by the TVF as described here >>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#window-functions>). >>> Because of that, the planner cannot resolve the timestamp to use as >>> event-time in the result stream. Try to select only the fields you need in >>> the nested SELECT, e.g.: >>> >>> SELECT * >>> FROM ( >>> SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start, >>> window_end, user_id ORDER BY ts ASC) as rownum >>> FROM TABLE( >>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) >>> ) >>> WHERE rownum = 1 >>> >>> On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov <ebonfortr...@gmail.com> >>> wrote: >>> >>>> I'm trying to express a supposedly simple query with Flink SQL - log >>>> the first visit a day for each user. Source table is defined like >>>> >>>> CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS >>>> ts) >>>> WITH ('connector' = 'filesystem', >>>> 'path' = 'file:///visits.csv', >>>> 'format' = 'csv') >>>> >>>> The query I came with up is >>>> >>>> SELECT * >>>> FROM ( >>>> SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, >>>> user_id ORDER BY ts ASC) as rownum >>>> FROM TABLE( >>>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) >>>> ) >>>> WHERE rownum = 1 >>>> >>>> But it fails with error >>>> [ERROR] Could not execute SQL statement. Reason: >>>> org.apache.flink.table.api.TableException: Found more than one rowtime >>>> field: [ts, window_time] in the query when insert into >>>> 'default_catalog.default_database.Unregistered_Collect_Sink_6'. >>>> Please select the rowtime field that should be used as event-time >>>> timestamp for the DataStream by casting all other fields to TIMESTAMP. >>>> >>>> Any ideas on how to fix this? >>>> >>>