Sorry for late response, Martijn and Francesco have already give good advises to find out the problem. I only have one minor supplementary information, window rank/join/aggregate would emit results after the end of the window. Now the window size is 24 hour, is there any possible the first window is not triggered yet?
Best, JING ZHANG Francesco Guardiani <france...@ververica.com> 于2021年11月5日周五 上午12:57写道: > As a rule of thumb, I would first try to check that Flink ingests > correctly your csv. Perhaps try to run just a select on your input and see > if the input is parsed as expected and is ordered. > > On Thu, Nov 4, 2021 at 12:47 PM Martijn Visser <mart...@ververica.com> > wrote: > >> Hi Pavel, >> >> There's a Flink SQL recipe in the Flink SQL Cookbook for a Window TopN, >> see >> https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/11_window_top_n/11_window_top_n.md. >> I think that could help you with your use case too. >> >> Best regards, >> >> Martijn >> >> On Thu, 4 Nov 2021 at 12:42, Pavel Penkov <ebonfortr...@gmail.com> wrote: >> >>> When the query changed to >>> >>> SELECT user_id, ts, rownum >>> FROM ( >>> SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start, >>> window_end, user_id ORDER BY ts ASC) as rownum >>> FROM TABLE( >>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) >>> ) >>> WHERE rownum = 1 >>> >>> runs but doesn't produce any results. I've tried different window sizes, >>> the source file is sorted by timestamp. >>> >>> On Thu, Nov 4, 2021 at 1:42 PM Francesco Guardiani < >>> france...@ververica.com> wrote: >>> >>>> I think the issue here is that the nested select is selecting all the >>>> fields produced by the TVF, including window_time (which is implicitly >>>> added by the TVF as described here >>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#window-functions>). >>>> Because of that, the planner cannot resolve the timestamp to use as >>>> event-time in the result stream. Try to select only the fields you need in >>>> the nested SELECT, e.g.: >>>> >>>> SELECT * >>>> FROM ( >>>> SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start, >>>> window_end, user_id ORDER BY ts ASC) as rownum >>>> FROM TABLE( >>>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) >>>> ) >>>> WHERE rownum = 1 >>>> >>>> On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov <ebonfortr...@gmail.com> >>>> wrote: >>>> >>>>> I'm trying to express a supposedly simple query with Flink SQL - log >>>>> the first visit a day for each user. Source table is defined like >>>>> >>>>> CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS >>>>> ts) >>>>> WITH ('connector' = 'filesystem', >>>>> 'path' = 'file:///visits.csv', >>>>> 'format' = 'csv') >>>>> >>>>> The query I came with up is >>>>> >>>>> SELECT * >>>>> FROM ( >>>>> SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end, >>>>> user_id ORDER BY ts ASC) as rownum >>>>> FROM TABLE( >>>>> TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS)) >>>>> ) >>>>> WHERE rownum = 1 >>>>> >>>>> But it fails with error >>>>> [ERROR] Could not execute SQL statement. Reason: >>>>> org.apache.flink.table.api.TableException: Found more than one rowtime >>>>> field: [ts, window_time] in the query when insert into >>>>> 'default_catalog.default_database.Unregistered_Collect_Sink_6'. >>>>> Please select the rowtime field that should be used as event-time >>>>> timestamp for the DataStream by casting all other fields to TIMESTAMP. >>>>> >>>>> Any ideas on how to fix this? >>>>> >>>>