Hey Sebastien et al, have you tried rewriting the rolling aggregate as a window-over query? A window-over aggregation creates an append-only stream which should preserve the timestamp/watermark of the source. You can then add a deduplication <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/> to create a versioned state that you can use in a temporal join. The partition key of the deduplication becomes the primary key to join on.
I think what you are running into is that Flink creates a change-stream for a group-by aggregation that has retractions. Temporal joins only work on "versioned state" or lookup tables. However, I think you have a valid point in a more foundational limitation of FlinkSQL: It is currently not possible to create an append-only stream from a change stream without going through the DataStream API. It would be incredibly useful to support this natively in FlinkSQL. And Calcite has support for the STREAM <https://calcite.apache.org/docs/stream.html> keyword to do this. Materialize (i.e. differential dataflow) has a somewhat related construct in their SQL called a subscription <https://materialize.com/docs/sql/subscribe/>. We added support for this in DataSQRL <https://www.datasqrl.com/docs/reference/sqrl/stream/#convert-state-to-stream> (which is a streaming database compiler that generates Flink jobs) using a syntax that looks like this: STREAM ON UPDATE AS (*YOUR BASE QUERY*) to create a append-only stream from a change-stream by essentially dropping all retractions and deletes (you can also do STREAM ON DELETE to get only deletes, etc). However, I think this might be a feature that should live in FlinkSQL instead and we'd be happy to create a FLIP and donate our implementation if there is interest. Cheers, Matthias On Mon, Mar 18, 2024 at 3:01 AM Sebastien <s...@erreur404.ch> wrote: > Hi everyone, > > Before digging into what it would it take to implement a general solution, > I narrowed down the scope to write a fix which makes the query mentioned in > the thread work. Here are some findings: > > - For the temporal join logic, it's not the watermark that matters but > having a TimeIndicatorRelDataType column in the input relation. To address > that, in the PR below, we introduced a call to the LAST_VALUE aggregate > function to bring a timestamp column to the view. That makes the query > works, but we think it is not enough. It would probably require a distinct > aggregate function or a new syntax to be able to support more general use > cases. > - There is a relationship between the way the logical operators are > reordered, the way the special Flink's Timestamp time is materialized and > the watermark assigner. > - I also looked into the flink-sql-parser and I found out that Flink has > customized the parsing of the CREATE and DROP statements ( > https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd#L627-L638) > (and Calcite supports as well support customizations for ALTER statements) > but Calcite does not seem to support changes to the SELECT syntax (see > https://issues.apache.org/jira/browse/CALCITE-4979). I mention it because > I think it will inform what could be done syntax-wise. > > and a PR that highlights the changes with the complete investigation > https://github.com/apache/flink/pull/24506 > > This is more of a demonstration and I am looking to get feedback from > someone who has more experience with the codebase. > > Thanks, > Seb > > On Tue, Mar 5, 2024, at 10:07, Gyula Fóra wrote: > > Hi Everyone! > > I have discussed this with Sébastien Chevalley, he is going to prepare and > drive the FLIP while I will assist him along the way. > > Thanks > Gyula > > On Tue, Mar 5, 2024 at 9:57 AM <lorenzo.affe...@ververica.com> wrote: > > I do agree with Ron Liu. > This would definitely need a FLIP as it would impact SQL and extend it > with the equivalent of TimestampAssigners in the Java API. > > Is there any existing JIRA here, or is anybody willing to drive a FLIP? > On Feb 26, 2024 at 02:36 +0100, Ron liu <ron9....@gmail.com>, wrote: > > +1, > But I think this should be a more general requirement, that is, support for > declaring watermarks in query, which can be declared for any type of > source, such as table, view. Similar to databricks provided [1], this needs > a FLIP. > > [1] > > https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html > > Best, > Ron > > >