As for your original question, the documentation states that a temporal table function can only be registered via the Table API, and I believe this is true.
David On Thu, Oct 6, 2022 at 9:59 AM David Anderson <dander...@apache.org> wrote: > Salva, > > Have you tried doing an AS OF style processing time temporal join? I know > the documentation leads one to believe this isn't supported, but I think it > actually works. I'm basing this on this comment [1] in the code for > the TemporalProcessTimeJoinOperator: > > The operator to temporal join a stream on processing time. > > For temporal TableFunction join (LATERAL >> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME >> AS OF), they can reuse same processing-time operator implementation, the >> differences between them are: (1) The temporal TableFunction join only >> supports single column in primary key but temporal table join supports >> arbitrary columns in primary key. (2) The temporal TableFunction join only >> supports inner join, temporal table join supports both inner join and left >> outer join. > > > [1] > https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38 > > Regards, > David > > On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> I've found more examples here: >> >> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance >> >> where a fact table is enriched using several dimension tables, but again >> the temporal table functions are registered using Table API like so: >> >> ```java >> tEnv.registerFunction( >> "dimension_table1", >> tEnv.from("dim_table1").createTemporalTableFunction("r_proctime", >> "id"));``` >> >> It's not exactly the same application, since this example covers a lookup >> join, but the SQL query is also relying on the LATERAL TABLE + temporal >> table functions: >> >> ``` >> SELECT >> D1.col1 AS A, >> D1.col2 AS B, >> FROM >> fact_table, >> LATERAL TABLE (dimension_table1(f_proctime)) AS D1, >> WHERE >> fact_table.dim1 = D1.id >> ``` >> >> In particular, this produces a job which is equivalent to >> >> ``` >> private abstract static class AbstractFactDimTableJoin<IN1, OUT> >> extends CoProcessFunction<IN1, Dimension, OUT> { >> private static final long serialVersionUID = 1L; >> >> protected transient ValueState<Dimension> dimState; >> >> @Override >> public void processElement1(IN1 value, Context ctx, Collector<OUT> >> out) throws Exception { >> Dimension dim = dimState.value(); >> if (dim == null) { >> return; >> } >> out.collect(join(value, dim)); >> } >> >> abstract OUT join(IN1 value, Dimension dim); >> >> @Override >> public void processElement2(Dimension value, Context ctx, >> Collector<OUT> out) throws Exception { >> dimState.update(value); >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> super.open(parameters); >> ValueStateDescriptor<Dimension> dimStateDesc = >> new ValueStateDescriptor<>("dimstate", Dimension.class); >> this.dimState = getRuntimeContext().getState(dimStateDesc); >> } >> } >> ``` >> >> I'm basically interested in rewriting these types of DIY joins (based on >> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if >> possible, otherwise I would like to know which limitations there are. >> >> Regards, >> >> Salva >> >> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> By looking at the docs for older versions of Flink, e.g., >>> >>> >>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html >>> >>> it seems that it's possible to rewrite this query >>> >>> ``` >>> SELECT >>> o.amount * r.rate AS amount >>> FROM >>> Orders AS o, >>> LATERAL TABLE (Rates(o.rowtime)) AS r >>> WHERE r.currency = o.currency >>> ``` >>> >>> as >>> >>> ``` >>> SELECT >>> SUM(o.amount * r.rate) AS amount >>> FROM Orders AS o, >>> RatesHistory AS r >>> WHERE r.currency = o.currency >>> AND r.rowtime = ( >>> SELECT MAX(rowtime) >>> FROM RatesHistory AS r2 >>> WHERE r2.currency = o.currency >>> AND r2.rowtime <= o.rowtime); >>> ``` >>> >>> This would be a way to accomplish this task in SQL without using a >>> temporal table function. >>> >>> Would this rewrite be equivalent in terms of the final generated job? >>> Obviously I very much prefer the LATERAL TABLE query but this requires >>> using a temporal table function which can only be registered using the >>> Table API (apparently). >>> >>> Regards, >>> >>> Salva >>> >>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <salcantara...@gmail.com> >>> wrote: >>> >>>> It doesn't seem the case with processing time unless I'm mistaken: >>>> >>>> >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join >>>> >>>> This case seems to require a different syntax based on LATERAL TABLE >>>> and a temporal table function (FOR SYSTEM_TIME is not supported). From the >>>> docs too, it seems that temporal table functions can only be registered via >>>> the table API. Am I missing/misunderstanding something? >>>> >>>> Salva >>>> >>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <martijnvis...@apache.org> >>>> wrote: >>>> >>>>> Hi Salva, >>>>> >>>>> The examples for temporal table joins can be found at >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins. >>>>> Your example is definitely possible with just using SQL. >>>>> >>>>> Best regards, >>>>> >>>>> Martijn >>>>> >>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara < >>>>> salcantara...@gmail.com> wrote: >>>>> >>>>>> Based on this: >>>>>> >>>>>> >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/ >>>>>> >>>>>> It seems that the only way of registering temporal table functions is >>>>>> via the Table API. >>>>>> >>>>>> If that is the case, is there a way to make this example work >>>>>> >>>>>> ``` >>>>>> SELECT >>>>>> SUM(amount * rate) AS amount >>>>>> FROM >>>>>> orders, >>>>>> LATERAL TABLE (rates(order_time)) >>>>>> WHERE >>>>>> rates.currency = orders.currency >>>>>> ``` >>>>>> >>>>>> without the Table API, just using SQL? E.g., is it possible to deploy >>>>>> the temporal table function to the cluster (by packaging it in a jar >>>>>> file) >>>>>> and then run the above query from the Flink SQL CLI? >>>>>> >>>>>> Thanks in advance, >>>>>> >>>>>> Salva >>>>>> >>>>>>