I was wrong about this. The AS OF style processing join has been disabled
at a higher level,
in 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator

David

On Thu, Oct 6, 2022 at 9:59 AM David Anderson <dander...@apache.org> wrote:

> Salva,
>
> Have you tried doing an AS OF style processing time temporal join? I know
> the documentation leads one to believe this isn't supported, but I think it
> actually works. I'm basing this on this comment [1] in the code for
> the TemporalProcessTimeJoinOperator:
>
> The operator to temporal join a stream on processing time.
>
> For temporal TableFunction join (LATERAL
>> TemporalTableFunction(o.proctime)) and temporal table join (FOR SYSTEM_TIME
>> AS OF), they can reuse same processing-time operator implementation, the
>> differences between them are: (1) The temporal TableFunction join only
>> supports single column in primary key but temporal table join supports
>> arbitrary columns in primary key. (2) The temporal TableFunction join only
>> supports inner join, temporal table join supports both inner join and left
>> outer join.
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
>
> Regards,
> David
>
> On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <salcantara...@gmail.com>
> wrote:
>
>> I've found more examples here:
>>
>> https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
>>
>> where a fact table is enriched using several dimension tables, but again
>> the temporal table functions are registered using Table API like so:
>>
>> ```java
>>     tEnv.registerFunction(
>>         "dimension_table1",
>>         tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
>> "id"));```
>>
>> It's not exactly the same application, since this example covers a lookup
>> join, but the SQL query is also relying on the LATERAL TABLE + temporal
>> table functions:
>>
>> ```
>> SELECT
>>     D1.col1 AS A,
>>     D1.col2 AS B,
>> FROM
>>     fact_table,
>>     LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
>> WHERE
>>     fact_table.dim1     = D1.id
>> ```
>>
>> In particular, this produces a job which is equivalent to
>>
>> ```
>>   private abstract static class AbstractFactDimTableJoin<IN1, OUT>
>>       extends CoProcessFunction<IN1, Dimension, OUT> {
>>     private static final long serialVersionUID = 1L;
>>
>>     protected transient ValueState<Dimension> dimState;
>>
>>     @Override
>>     public void processElement1(IN1 value, Context ctx, Collector<OUT>
>> out) throws Exception {
>>       Dimension dim = dimState.value();
>>       if (dim == null) {
>>         return;
>>       }
>>       out.collect(join(value, dim));
>>     }
>>
>>     abstract OUT join(IN1 value, Dimension dim);
>>
>>     @Override
>>     public void processElement2(Dimension value, Context ctx,
>> Collector<OUT> out) throws Exception {
>>       dimState.update(value);
>>     }
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>       super.open(parameters);
>>       ValueStateDescriptor<Dimension> dimStateDesc =
>>           new ValueStateDescriptor<>("dimstate", Dimension.class);
>>       this.dimState = getRuntimeContext().getState(dimStateDesc);
>>     }
>>   }
>> ```
>>
>> I'm basically interested in rewriting these types of DIY joins (based on
>> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
>> possible, otherwise I would like to know which limitations there are.
>>
>> Regards,
>>
>> Salva
>>
>> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> By looking at the docs for older versions of Flink, e.g.,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
>>>
>>> it seems that it's possible to rewrite this query
>>>
>>> ```
>>> SELECT
>>>   o.amount * r.rate AS amount
>>> FROM
>>>   Orders AS o,
>>>   LATERAL TABLE (Rates(o.rowtime)) AS r
>>> WHERE r.currency = o.currency
>>> ```
>>>
>>> as
>>>
>>> ```
>>> SELECT
>>>   SUM(o.amount * r.rate) AS amount
>>> FROM Orders AS o,
>>>   RatesHistory AS r
>>> WHERE r.currency = o.currency
>>> AND r.rowtime = (
>>>   SELECT MAX(rowtime)
>>>   FROM RatesHistory AS r2
>>>   WHERE r2.currency = o.currency
>>>   AND r2.rowtime <= o.rowtime);
>>> ```
>>>
>>> This would be a way to accomplish this task in SQL without using a
>>> temporal table function.
>>>
>>> Would this rewrite be equivalent in terms of the final generated job?
>>> Obviously I very much prefer the LATERAL TABLE query but this requires
>>> using a temporal table function which can only be registered using the
>>> Table API (apparently).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <salcantara...@gmail.com>
>>> wrote:
>>>
>>>> It doesn't seem the case with processing time unless I'm mistaken:
>>>>
>>>>
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
>>>>
>>>> This case seems to require a different syntax based on LATERAL TABLE
>>>> and a temporal table function (FOR SYSTEM_TIME is not supported). From the
>>>> docs too, it seems that temporal table functions can only be registered via
>>>> the table API. Am I missing/misunderstanding something?
>>>>
>>>> Salva
>>>>
>>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <martijnvis...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Salva,
>>>>>
>>>>> The examples for temporal table joins can be found at
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins.
>>>>> Your example is definitely possible with just using SQL.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn
>>>>>
>>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
>>>>> salcantara...@gmail.com> wrote:
>>>>>
>>>>>> Based on this:
>>>>>>
>>>>>>
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
>>>>>>
>>>>>> It seems that the only way of registering temporal table functions is
>>>>>> via the Table API.
>>>>>>
>>>>>> If that is the case, is there a way to make this example work
>>>>>>
>>>>>> ```
>>>>>> SELECT
>>>>>>   SUM(amount * rate) AS amount
>>>>>> FROM
>>>>>>   orders,
>>>>>>   LATERAL TABLE (rates(order_time))
>>>>>> WHERE
>>>>>>   rates.currency = orders.currency
>>>>>> ```
>>>>>>
>>>>>> without the Table API, just using SQL? E.g., is it possible to deploy
>>>>>> the temporal table function to the cluster (by packaging it in a jar 
>>>>>> file)
>>>>>> and then run the above query from the Flink SQL CLI?
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> Salva
>>>>>>
>>>>>>

Reply via email to