Hi Pavel,

There's a Flink SQL recipe in the Flink SQL Cookbook for a Window TopN, see
https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/11_window_top_n/11_window_top_n.md.
I think that could help you with your use case too.

Best regards,

Martijn

On Thu, 4 Nov 2021 at 12:42, Pavel Penkov <ebonfortr...@gmail.com> wrote:

> When the query changed to
>
> SELECT user_id, ts, rownum
> FROM (
>   SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start,
> window_end, user_id ORDER BY ts ASC) as rownum
>   FROM TABLE(
>     TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
> )
> WHERE rownum = 1
>
> runs but doesn't produce any results. I've tried different window sizes,
> the source file is sorted by timestamp.
>
> On Thu, Nov 4, 2021 at 1:42 PM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> I think the issue here is that the nested select is selecting all the
>> fields produced by the TVF, including window_time (which is implicitly
>> added by the TVF as described here
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#window-functions>).
>> Because of that, the planner cannot resolve the timestamp to use as
>> event-time in the result stream. Try to select only the fields you need in
>> the nested SELECT, e.g.:
>>
>> SELECT *
>> FROM (
>>   SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start,
>> window_end, user_id ORDER BY ts ASC) as rownum
>>   FROM TABLE(
>>     TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
>> )
>> WHERE rownum = 1
>>
>> On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov <ebonfortr...@gmail.com>
>> wrote:
>>
>>> I'm trying to express a supposedly simple query with Flink SQL - log the
>>> first visit a day for each user. Source table is defined like
>>>
>>> CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS
>>> ts)
>>> WITH ('connector' = 'filesystem',
>>> 'path' = 'file:///visits.csv',
>>> 'format' = 'csv')
>>>
>>> The query I came with up is
>>>
>>> SELECT *
>>> FROM (
>>>   SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,
>>> user_id ORDER BY ts ASC) as rownum
>>>   FROM TABLE(
>>>     TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
>>> )
>>> WHERE rownum = 1
>>>
>>> But it fails with error
>>> [ERROR] Could not execute SQL statement. Reason:
>>> org.apache.flink.table.api.TableException: Found more than one rowtime
>>> field: [ts, window_time] in the query when insert into
>>> 'default_catalog.default_database.Unregistered_Collect_Sink_6'.
>>> Please select the rowtime field that should be used as event-time
>>> timestamp for the DataStream by casting all other fields to TIMESTAMP.
>>>
>>> Any ideas on how to fix this?
>>>
>>

Reply via email to