I think the issue here is that the nested select is selecting all the
fields produced by the TVF, including window_time (which is implicitly
added by the TVF as described here
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/#window-functions>).
Because of that, the planner cannot resolve the timestamp to use as
event-time in the result stream. Try to select only the fields you need in
the nested SELECT, e.g.:

SELECT *
FROM (
  SELECT user_id, ts, ROW_NUMBER() OVER (PARTITION BY window_start,
window_end, user_id ORDER BY ts ASC) as rownum
  FROM TABLE(
    TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
)
WHERE rownum = 1

On Thu, Nov 4, 2021 at 11:18 AM Pavel Penkov <ebonfortr...@gmail.com> wrote:

> I'm trying to express a supposedly simple query with Flink SQL - log the
> first visit a day for each user. Source table is defined like
>
> CREATE TABLE visits (user_id int, ts timestamp(3), WATERMARK FOR ts AS ts)
> WITH ('connector' = 'filesystem',
> 'path' = 'file:///visits.csv',
> 'format' = 'csv')
>
> The query I came with up is
>
> SELECT *
> FROM (
>   SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end,
> user_id ORDER BY ts ASC) as rownum
>   FROM TABLE(
>     TUMBLE(TABLE visits, DESCRIPTOR(ts), INTERVAL '24' HOURS))
> )
> WHERE rownum = 1
>
> But it fails with error
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Found more than one rowtime
> field: [ts, window_time] in the query when insert into
> 'default_catalog.default_database.Unregistered_Collect_Sink_6'.
> Please select the rowtime field that should be used as event-time
> timestamp for the DataStream by casting all other fields to TIMESTAMP.
>
> Any ideas on how to fix this?
>

Reply via email to