Forgot to mention, I am using Flink 1.12. On Fri, Jan 29, 2021 at 10:11 AM Patrick Angeles <patrickange...@gmail.com> wrote:
> Fairly new to Flink here so this might be a newbie mistake, but here's the > problem. I created the following table and view: > > >> CREATE TABLE test ( >> >> event_time TIMESTAMP(3), >> >> symbol STRING, >> >> price DOUBLE, >> >> WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE >> >> ) WITH ( >> >> ... >> >> ) ; >> >> >> >> CREATE VIEW test_view AS ( >> >> SELECT >> >> symbol, >> >> TUMBLE_START (event_time, INTERVAL '1' MINUTE) AS t_start, >> >> AVG (price) AS avg_price, >> >> MIN (price) AS min_price, >> >> MAX (price) AS max_price >> >> FROM >> >> test >> >> WHERE event_time IS NOT NULL >> >> GROUP BY >> >> TUMBLE(event_time, INTERVAL '1' MINUTE), symbol >> >> ) ; >> > > Describe shows: > > Flink SQL> describe test ; >> >> >> +------------+------------------------+------+-----+--------+------------------------------------+ >> >> | name | type | null | key | extras | >> watermark | >> >> >> +------------+------------------------+------+-----+--------+------------------------------------+ >> >> | event_time | TIMESTAMP(3) *ROWTIME* | true | | | >> `event_time` - INTERVAL '1' MINUTE | >> >> | symbol | STRING | true | | | >> | >> >> | price | DOUBLE | true | | | >> | >> >> >> +------------+------------------------+------+-----+--------+------------------------------------+ >> >> 3 rows in set >> >> >> Flink SQL> describe test_view ; >> >> +-----------+------------------------+------+-----+--------+-----------+ >> >> | name | type | null | key | extras | watermark | >> >> +-----------+------------------------+------+-----+--------+-----------+ >> >> | symbol | STRING | true | | | | >> >> | t_start | TIMESTAMP(3) *ROWTIME* | true | | | | >> >> | avg_price | DOUBLE | true | | | | >> >> | min_price | DOUBLE | true | | | | >> >> | max_price | DOUBLE | true | | | | >> >> +-----------+------------------------+------+-----+--------+-----------+ >> >> 5 rows in set >> > > When I run a query over the view, I get the following error: > > Flink SQL> SELECT >> >> > symbol, >> >> > t_start, >> >> > avg_price, >> >> > min_price, >> >> > max_price, >> >> > FIRST_VALUE (avg_price) OVER x AS prev_avg_price >> >> > FROM test_view >> >> > WINDOW x AS ( >> >> > PARTITION BY symbol >> >> > ORDER BY t_start >> >> > ROWS BETWEEN 1 PRECEDING AND CURRENT ROW >> >> > ) ; >> >> > >> >> *[ERROR] Could not execute SQL statement. Reason:* >> >> *org.apache.flink.table.api.TableException: OVER windows' ordering in >> stream mode must be defined on a time attribute.* >> > > Right now, to get around this, I need to materialize the results off test > table into a new table that matches the view. Seems that this ought to be > doable doing everything in one job instead of the intermediate > materialization step. Am I missing something? > > Thanks in advance. > > Patrick > > > >