As a rule of thumb, I would first try to check that Flink ingests correctly
your csv. Perhaps try to run just a select on your input and see if the
input is parsed as expected and is ordered.

On Thu, Nov 4, 2021 at 12:47 PM Martijn Visser <mart...@ververica.com>
wrote:

> Hi Pavel,
>
> There's a Flink SQL recipe in the Flink SQL Cookbook for a Window TopN,
> see
> https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/11_window_top_n/11_window_top_n.md.
> I think that could help you with your use case too.
>
> Best regards,
>
> Martijn
>
> On Thu, 4 Nov 2021 at 12:42, Pavel Penkov <ebonfortr...@gmail.com> wrote:
>
>> When the query changed to
>>
>> SELECT user_id, ts, rownum
>> FROM (
>>   SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start,
>> window_end, user_id ORDER BY ts ASC) as rownum
>>   FROM TABLE(
>>     TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
>> )
>> WHERE rownum = 1
>>
>> runs but doesn't produce any results. I've tried different window sizes,
>> the source file is sorted by timestamp.
>>
>> On Thu, Nov 4, 2021 at 1:42 PM Francesco Guardiani <
>> france...@ververica.com> wrote:
>>
>>> I think the issue here is that the nested select is selecting all the
>>> fields produced by the TVF, including window_time (which is implicitly
>>> added by the TVF as described here
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#window-functions>).
>>> Because of that, the planner cannot resolve the timestamp to use as
>>> event-time in the result stream. Try to select only the fields you need in
>>> the nested SELECT, e.g.:
>>>
>>> SELECT *
>>> FROM (
>>>   SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start,
>>> window_end, user_id ORDER BY ts ASC) as rownum
>>>   FROM TABLE(
>>>     TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
>>> )
>>> WHERE rownum = 1
>>>
>>> On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov <ebonfortr...@gmail.com>
>>> wrote:
>>>
>>>> I'm trying to express a supposedly simple query with Flink SQL - log
>>>> the first visit a day for each user. Source table is defined like
>>>>
>>>> CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS
>>>> ts)
>>>> WITH ('connector' = 'filesystem',
>>>> 'path' = 'file:///visits.csv',
>>>> 'format' = 'csv')
>>>>
>>>> The query I came with up is
>>>>
>>>> SELECT *
>>>> FROM (
>>>>   SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,
>>>> user_id ORDER BY ts ASC) as rownum
>>>>   FROM TABLE(
>>>>     TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
>>>> )
>>>> WHERE rownum = 1
>>>>
>>>> But it fails with error
>>>> [ERROR] Could not execute SQL statement. Reason:
>>>> org.apache.flink.table.api.TableException: Found more than one rowtime
>>>> field: [ts, window_time] in the query when insert into
>>>> 'default_catalog.default_database.Unregistered_Collect_Sink_6'.
>>>> Please select the rowtime field that should be used as event-time
>>>> timestamp for the DataStream by casting all other fields to TIMESTAMP.
>>>>
>>>> Any ideas on how to fix this?
>>>>
>>>

Reply via email to