Hey Sebastien et al,

have you tried rewriting the rolling aggregate as a window-over query? A
window-over aggregation creates an append-only stream which should preserve
the timestamp/watermark of the source. You can then add a deduplication
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/>
to
create a versioned state that you can use in a temporal join. The partition
key of the deduplication becomes the primary key to join on.

I think what you are running into is that Flink creates a change-stream for
a group-by aggregation that has retractions. Temporal joins only work on
"versioned state" or lookup tables.

However, I think you have a valid point in a more foundational limitation
of FlinkSQL: It is currently not possible to create an append-only stream
from a change stream without going through the DataStream API. It would be
incredibly useful to support this natively in FlinkSQL. And Calcite has
support for the STREAM <https://calcite.apache.org/docs/stream.html>
keyword to do this. Materialize (i.e. differential dataflow) has a somewhat
related construct in their SQL called a subscription
<https://materialize.com/docs/sql/subscribe/>.

We added support for this in DataSQRL
<https://www.datasqrl.com/docs/reference/sqrl/stream/#convert-state-to-stream>
(which
is a streaming database compiler that generates Flink jobs) using a syntax
that looks like this:
STREAM ON UPDATE AS (*YOUR BASE QUERY*)
to create a append-only stream from a change-stream by essentially dropping
all retractions and deletes (you can also do STREAM ON DELETE to get only
deletes, etc). However, I think this might be a feature that should live in
FlinkSQL instead and we'd be happy to create a FLIP and donate our
implementation if there is interest.

Cheers,
Matthias

On Mon, Mar 18, 2024 at 3:01 AM Sebastien <s...@erreur404.ch> wrote:

> Hi everyone,
>
> Before digging into what it would it take to implement a general solution,
> I narrowed down the scope to write a fix which makes the query mentioned in
> the thread work. Here are some findings:
>
> - For the temporal join logic, it's not the watermark that matters but
> having a TimeIndicatorRelDataType column in the input relation. To address
> that, in the PR below, we introduced a call to the LAST_VALUE aggregate
> function to bring a timestamp column to the view. That makes the query
> works, but we think it is not enough. It would probably require a distinct
> aggregate function or a new syntax to be able to support more general use
> cases.
> - There is a relationship between the way the logical operators are
> reordered, the way the special Flink's Timestamp time is materialized and
> the watermark assigner.
> - I also looked into the flink-sql-parser and I found out that Flink has
> customized the parsing of the CREATE and DROP statements (
> https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd#L627-L638)
> (and Calcite supports as well support customizations for ALTER statements)
> but Calcite does not seem to support changes to the SELECT syntax (see
> https://issues.apache.org/jira/browse/CALCITE-4979). I mention it because
> I think it will inform what could be done syntax-wise.
>
> and a PR that highlights the changes with the complete investigation
> https://github.com/apache/flink/pull/24506
>
> This is more of a demonstration and I am looking to get feedback from
> someone who has more experience with the codebase.
>
> Thanks,
> Seb
>
> On Tue, Mar 5, 2024, at 10:07, Gyula Fóra wrote:
>
> Hi Everyone!
>
> I have discussed this with Sébastien Chevalley, he is going to prepare and
> drive the FLIP while I will assist him along the way.
>
> Thanks
> Gyula
>
> On Tue, Mar 5, 2024 at 9:57 AM <lorenzo.affe...@ververica.com> wrote:
>
> I do agree with Ron Liu.
> This would definitely need a FLIP as it would impact SQL and extend it
> with the equivalent of TimestampAssigners in the Java API.
>
> Is there any existing JIRA here, or is anybody willing to drive a FLIP?
> On Feb 26, 2024 at 02:36 +0100, Ron liu <ron9....@gmail.com>, wrote:
>
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
>
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron
>
>
>

Reply via email to