Hi David,

Many thanks for your reply. I understand then that there is no easy way to
do a simple processing-time join (purely based on SQL without using the
table API) where you:

- Save elements seen on the right in the current state (in general this
state can be regarded as a materialised view, e.g., could be a value but
also a list)
- Perform a join operation for each element coming on the left based on the
current state (only consider new events here, and simply discard elements
without producing a join result if there no match based on current state)

along the lines of that in the TemporalProcessTimeJoinOperator.java link
that you provided, or, if you like, the typical DIY type of joins that one
normally/easily does when using the DataStream API.

Regards,

Salva

On 2022/10/06 17:23:30 David Anderson wrote:
> I was wrong about this. The AS OF style processing join has been disabled
> at a higher level,
> in
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin#createJoinOperator
>
> David
>
> On Thu, Oct 6, 2022 at 9:59 AM David Anderson <da...@apache.org> wrote:
>
> > Salva,
> >
> > Have you tried doing an AS OF style processing time temporal join? I
know
> > the documentation leads one to believe this isn't supported, but I
think it
> > actually works. I'm basing this on this comment [1] in the code for
> > the TemporalProcessTimeJoinOperator:
> >
> > The operator to temporal join a stream on processing time.
> >
> > For temporal TableFunction join (LATERAL
> >> TemporalTableFunction(o.proctime)) and temporal table join (FOR
SYSTEM_TIME
> >> AS OF), they can reuse same processing-time operator implementation,
the
> >> differences between them are: (1) The temporal TableFunction join only
> >> supports single column in primary key but temporal table join supports
> >> arbitrary columns in primary key. (2) The temporal TableFunction join
only
> >> supports inner join, temporal table join supports both inner join and
left
> >> outer join.
> >
> >
> > [1]
> >
https://github.com/apache/flink/blob/release-1.15/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperator.java#L38
> >
> > Regards,
> > David
> >
> > On Wed, Oct 5, 2022 at 6:39 AM Salva Alcántara <sa...@gmail.com>
> > wrote:
> >
> >> I've found more examples here:
> >>
> >>
https://www.ververica.com/blog/a-journey-to-beating-flinks-sql-performance
> >>
> >> where a fact table is enriched using several dimension tables, but
again
> >> the temporal table functions are registered using Table API like so:
> >>
> >> ```java
> >>     tEnv.registerFunction(
> >>         "dimension_table1",
> >>
tEnv.from("dim_table1").createTemporalTableFunction("r_proctime",
> >> "id"));```
> >>
> >> It's not exactly the same application, since this example covers a
lookup
> >> join, but the SQL query is also relying on the LATERAL TABLE + temporal
> >> table functions:
> >>
> >> ```
> >> SELECT
> >>     D1.col1 AS A,
> >>     D1.col2 AS B,
> >> FROM
> >>     fact_table,
> >>     LATERAL TABLE (dimension_table1(f_proctime)) AS D1,
> >> WHERE
> >>     fact_table.dim1     = D1.id
> >> ```
> >>
> >> In particular, this produces a job which is equivalent to
> >>
> >> ```
> >>   private abstract static class AbstractFactDimTableJoin<IN1, OUT>
> >>       extends CoProcessFunction<IN1, Dimension, OUT> {
> >>     private static final long serialVersionUID = 1L;
> >>
> >>     protected transient ValueState<Dimension> dimState;
> >>
> >>     @Override
> >>     public void processElement1(IN1 value, Context ctx, Collector<OUT>
> >> out) throws Exception {
> >>       Dimension dim = dimState.value();
> >>       if (dim == null) {
> >>         return;
> >>       }
> >>       out.collect(join(value, dim));
> >>     }
> >>
> >>     abstract OUT join(IN1 value, Dimension dim);
> >>
> >>     @Override
> >>     public void processElement2(Dimension value, Context ctx,
> >> Collector<OUT> out) throws Exception {
> >>       dimState.update(value);
> >>     }
> >>
> >>     @Override
> >>     public void open(Configuration parameters) throws Exception {
> >>       super.open(parameters);
> >>       ValueStateDescriptor<Dimension> dimStateDesc =
> >>           new ValueStateDescriptor<>("dimstate", Dimension.class);
> >>       this.dimState = getRuntimeContext().getState(dimStateDesc);
> >>     }
> >>   }
> >> ```
> >>
> >> I'm basically interested in rewriting these types of DIY joins (based
on
> >> CoProcessFunction or CoFlatMapFunction) from DataStream to pure SQL if
> >> possible, otherwise I would like to know which limitations there are.
> >>
> >> Regards,
> >>
> >> Salva
> >>
> >> On Tue, Oct 4, 2022 at 9:09 PM Salva Alcántara <sa...@gmail.com>
> >> wrote:
> >>
> >>> By looking at the docs for older versions of Flink, e.g.,
> >>>
> >>>
> >>>
https://nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html
> >>>
> >>> it seems that it's possible to rewrite this query
> >>>
> >>> ```
> >>> SELECT
> >>>   o.amount * r.rate AS amount
> >>> FROM
> >>>   Orders AS o,
> >>>   LATERAL TABLE (Rates(o.rowtime)) AS r
> >>> WHERE r.currency = o.currency
> >>> ```
> >>>
> >>> as
> >>>
> >>> ```
> >>> SELECT
> >>>   SUM(o.amount * r.rate) AS amount
> >>> FROM Orders AS o,
> >>>   RatesHistory AS r
> >>> WHERE r.currency = o.currency
> >>> AND r.rowtime = (
> >>>   SELECT MAX(rowtime)
> >>>   FROM RatesHistory AS r2
> >>>   WHERE r2.currency = o.currency
> >>>   AND r2.rowtime <= o.rowtime);
> >>> ```
> >>>
> >>> This would be a way to accomplish this task in SQL without using a
> >>> temporal table function.
> >>>
> >>> Would this rewrite be equivalent in terms of the final generated job?
> >>> Obviously I very much prefer the LATERAL TABLE query but this requires
> >>> using a temporal table function which can only be registered using the
> >>> Table API (apparently).
> >>>
> >>> Regards,
> >>>
> >>> Salva
> >>>
> >>> On Tue, Oct 4, 2022 at 8:39 PM Salva Alcántara <sa...@gmail.com>
> >>> wrote:
> >>>
> >>>> It doesn't seem the case with processing time unless I'm mistaken:
> >>>>
> >>>>
> >>>>
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#processing-time-temporal-join
> >>>>
> >>>> This case seems to require a different syntax based on LATERAL TABLE
> >>>> and a temporal table function (FOR SYSTEM_TIME is not supported).
>From the
> >>>> docs too, it seems that temporal table functions can only be
registered via
> >>>> the table API. Am I missing/misunderstanding something?
> >>>>
> >>>> Salva
> >>>>
> >>>> On Tue, Oct 4, 2022, 19:26 Martijn Visser <ma...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Hi Salva,
> >>>>>
> >>>>> The examples for temporal table joins can be found at
> >>>>>
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins
.
> >>>>> Your example is definitely possible with just using SQL.
> >>>>>
> >>>>> Best regards,
> >>>>>
> >>>>> Martijn
> >>>>>
> >>>>> On Tue, Oct 4, 2022 at 12:20 PM Salva Alcántara <
> >>>>> salcantara...@gmail.com> wrote:
> >>>>>
> >>>>>> Based on this:
> >>>>>>
> >>>>>>
> >>>>>>
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
> >>>>>>
> >>>>>> It seems that the only way of registering temporal table functions
is
> >>>>>> via the Table API.
> >>>>>>
> >>>>>> If that is the case, is there a way to make this example work
> >>>>>>
> >>>>>> ```
> >>>>>> SELECT
> >>>>>>   SUM(amount * rate) AS amount
> >>>>>> FROM
> >>>>>>   orders,
> >>>>>>   LATERAL TABLE (rates(order_time))
> >>>>>> WHERE
> >>>>>>   rates.currency = orders.currency
> >>>>>> ```
> >>>>>>
> >>>>>> without the Table API, just using SQL? E.g., is it possible to
deploy
> >>>>>> the temporal table function to the cluster (by packaging it in a
jar file)
> >>>>>> and then run the above query from the Flink SQL CLI?
> >>>>>>
> >>>>>> Thanks in advance,
> >>>>>>
> >>>>>> Salva
> >>>>>>
> >>>>>>
>

Reply via email to