Salva, Have you tried doing an AS OF style processing time temporal join? I know the documentation leads one to believe this isn't supported, but I think it actually works. I'm basing this on this comment [1] in the code for the TemporalProcessTimeJoinOperator:
The operator to temporal join a stream on processing time. For temporal TableFunction join (LATERAL TemporalTableFunction(o.proctime)) > and temporal table join (FOR SYSTEM_TIME AS OF), they can reuse same > processing-time operator implementation, the differences between them are: > (1) The temporal TableFunction join only supports single column in primary > key but temporal table join supports arbitrary columns in primary key. (2) > The temporal TableFunction join only supports inner join, temporal table > join supports both inner join and left outer join. [1] https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 Regards, David On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <salcantara...@gmail.com> wrote: > I've found more examples here: > > https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance > > where a fact table is enriched using several dimension tables, but again > the temporal table functions are registered using Table API like so: > > ```java > tEnv.registerFunction( > "dimension_table1", > tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", > "id"));``` > > It's not exactly the same application, since this example covers a lookup > join, but the SQL query is also relying on the LATERAL TABLE + temporal > table functions: > > ``` > SELECT > D1.col1 AS A, > D1.col2 AS B, > FROM > fact_table, > LATERAL TABLE (dimension_table1(f_proctime)) AS D1, > WHERE > fact_table.dim1 = D1.id > ``` > > In particular, this produces a job which is equivalent to > > ``` > private abstract static class AbstractFactDimTableJoin<IN1, OUT> > extends CoProcessFunction<IN1, Dimension, OUT> { > private static final long serialVersionUID = 1L; > > protected transient ValueState<Dimension> dimState; > > @Override > public void processElement1(IN1 value, Context ctx, Collector<OUT> > out) throws Exception { > Dimension dim = dimState.value(); > if (dim == null) { > return; > } > out.collect(join(value, dim)); > } > > abstract OUT join(IN1 value, Dimension dim); > > @Override > public void processElement2(Dimension value, Context ctx, > Collector<OUT> out) throws Exception { > dimState.update(value); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > ValueStateDescriptor<Dimension> dimStateDesc = > new ValueStateDescriptor<>("dimstate", Dimension.class); > this.dimState = getRuntimeContext().getState(dimStateDesc); > } > } > ``` > > I'm basically interested in rewriting these types of DIY joins (based on > CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if > possible, otherwise I would like to know which limitations there are. > > Regards, > > Salva > > On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> By looking at the docs for older versions of Flink, e.g., >> >> >> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html >> >> it seems that it's possible to rewrite this query >> >> ``` >> SELECT >> o.amount * r.rate AS amount >> FROM >> Orders AS o, >> LATERAL TABLE (Rates(o.rowtime)) AS r >> WHERE r.currency = o.currency >> ``` >> >> as >> >> ``` >> SELECT >> SUM(o.amount * r.rate) AS amount >> FROM Orders AS o, >> RatesHistory AS r >> WHERE r.currency = o.currency >> AND r.rowtime = ( >> SELECT MAX(rowtime) >> FROM RatesHistory AS r2 >> WHERE r2.currency = o.currency >> AND r2.rowtime <= o.rowtime); >> ``` >> >> This would be a way to accomplish this task in SQL without using a >> temporal table function. >> >> Would this rewrite be equivalent in terms of the final generated job? >> Obviously I very much prefer the LATERAL TABLE query but this requires >> using a temporal table function which can only be registered using the >> Table API (apparently). >> >> Regards, >> >> Salva >> >> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> It doesn't seem the case with processing time unless I'm mistaken: >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join >>> >>> This case seems to require a different syntax based on LATERAL TABLE and >>> a temporal table function (FOR SYSTEM_TIME is not supported). From the docs >>> too, it seems that temporal table functions can only be registered via the >>> table API. Am I missing/misunderstanding something? >>> >>> Salva >>> >>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <martijnvis...@apache.org> >>> wrote: >>> >>>> Hi Salva, >>>> >>>> The examples for temporal table joins can be found at >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins. >>>> Your example is definitely possible with just using SQL. >>>> >>>> Best regards, >>>> >>>> Martijn >>>> >>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara < >>>> salcantara...@gmail.com> wrote: >>>> >>>>> Based on this: >>>>> >>>>> >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ >>>>> >>>>> It seems that the only way of registering temporal table functions is >>>>> via the Table API. >>>>> >>>>> If that is the case, is there a way to make this example work >>>>> >>>>> ``` >>>>> SELECT >>>>> SUM(amount * rate) AS amount >>>>> FROM >>>>> orders, >>>>> LATERAL TABLE (rates(order_time)) >>>>> WHERE >>>>> rates.currency = orders.currency >>>>> ``` >>>>> >>>>> without the Table API, just using SQL? E.g., is it possible to deploy >>>>> the temporal table function to the cluster (by packaging it in a jar file) >>>>> and then run the above query from the Flink SQL CLI? >>>>> >>>>> Thanks in advance, >>>>> >>>>> Salva >>>>> >>>>>