+1 Support this feature. There are many limitations to using time window
aggregation currently, and if we can declare watermark and time attribute
on the view, it will make it easier for us to use time windows. Similarly,
it would be very useful if the primary key could be declared in the view.

Therefore, I believe we need a FLIP to detail the design of this feature.


Best,
Feng

On Fri, Feb 23, 2024 at 2:39 PM <mayaming1...@gmail.com> wrote:

> +1 for supporting defining time attributes on views.
>
> I once encountered the same problem as yours. I did some regular joins and
> lost time attribute, and hence I could no longer do window operations in
> subsequent logics. I had to output the joined view to Kafka, read from it
> again, and define watermark on the new source - a cubersome workaround.
>
> It would be more flexible if we could control time attribute / watermark
> on views, just as if it's some kind of special source.
>
> Thanks,
> Yaming
> 在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra <gyula.f...@gmail.com>,写道:
> > Posting this to dev as well as it potentially has some implications on
> development effort.
> >
> > What seems to be the problem here is that we cannot control/override
> Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
> cannot create a PRIMARY KEY on the view but I think the temporal join also
> should not require the PK, should we remove this limitation?
> >
> > The general problem is the inflexibility of the timestamp/watermark
> handling on query outputs, which makes this again impossible.
> >
> > The workaround here can be to write the rolling aggregate to Kafka, read
> it back again and join with that. The fact that this workaround is possible
> actually highlights the need for more flexibility on the query/view side in
> my opinion.
> >
> > Has anyone else run into this issue and considered the proper solution
> to the problem? Feels like it must be pretty common :)
> >
> > Cheers,
> > Gyula
> >
> >
> >
> >
> > > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley <s...@erreur404.ch>
> wrote:
> > > > Hi,
> > > >
> > > > I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
> > > >
> > > > org.apache.flink.table.api.ValidationException: Event-Time Temporal
> Table Join requires both primary key and row time attribute in versioned
> table, but no row time attribute can be found.
> > > >
> > > > It seems that after the aggregation, the table looses the watermark
> and it's not possible to add one with the SQL API as it's a view.
> > > >
> > > > CREATE TABLE orders (
> > > >     order_id INT,
> > > >     price DECIMAL(6, 2),
> > > >     currency_id INT,
> > > >     order_time AS NOW(),
> > > >     WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> > > > )
> > > >     WITH (
> > > >         'connector' = 'datagen',
> > > >         'rows-per-second' = '10',
> > > >         'fields.order_id.kind' = 'sequence',
> > > >         'fields.order_id.start' = '1',
> > > >         'fields.order_id.end' = '100000',
> > > >         'fields.currency_id.min' = '1',
> > > >         'fields.currency_id.max' = '20'
> > > >     );
> > > >
> > > > CREATE TABLE currency_rates (
> > > >     currency_id INT,
> > > >     conversion_rate DECIMAL(4, 3),
> > > >     PRIMARY KEY (currency_id) NOT ENFORCED
> > > > )
> > > >     WITH (
> > > >         'connector' = 'datagen',
> > > >         'rows-per-second' = '10',
> > > >         'fields.currency_id.min' = '1',
> > > >         'fields.currency_id.max' = '20'
> > > >     );
> > > >
> > > > CREATE TEMPORARY VIEW max_rates AS (
> > > >     SELECT
> > > >         currency_id,
> > > >         MAX(conversion_rate) AS max_rate
> > > >     FROM currency_rates
> > > >     GROUP BY currency_id
> > > > );
> > > >
> > > > CREATE TEMPORARY VIEW temporal_join AS (
> > > >     SELECT
> > > >         order_id,
> > > >         max_rates.max_rate
> > > >     FROM orders
> > > >          LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
> > > >          ON orders.currency_id = max_rates.currency_id
> > > > );
> > > >
> > > > SELECT * FROM temporal_join;
> > > >
> > > > Am I missing something? What would be a good starting point to
> address this?
> > > >
> > > > Thanks in advance,
> > > > Sébastien Chevalley
>

Reply via email to