Hi Mans,
*Before coming to the next part, we may need some backgrounds about lookup
join and temporal join.*
1. Lookup join is typically used to enrich a table with data that is
queried from an external system. It requires right table to be backed by a
lookup source connector.
Its syntax is same with processing time temporal join [1].
2. Temporal joins take an arbitrary table and correlate each row to the
corresponding row’s relevant version in the versioned table (right table).
It could joined with right side based on processing time and event time.

*The difference between Lookup join with processing time temporal join is
that:*
*P**rocessing time temporal join** does not require right table backed by a
lookup source, that means external system of right table does not need have
ability to materialize the table as a dynamic table, **processing time
temporal join operator would **maintain the most recent version of
dimension table.*
*however Lookup join requires right table backed by a lookup source, that
means **external system of right table should have ability to maintain the
latest snapshot of dynamic table itself.*

Now let's see the question list.
> 1.1. Does this mean that this only works with proc time ?
Yes, lookup join could only works with proc time. However, temporal join[1]
could use both proc time and event time.
> 1.2. Is there a way to deal with event time for orders and if so how ?
 You could use event time temporal join [1] to deal with event time case.

> 2.1 What is the purpose of use proc_time from the order side ?  Is it to
only limit the lasts order record for the lookup or is it to restrict the
customer record ?  Does this mean that flink tracks proc time for the
customer table ?
Proc_time ensures that each row of the Orders table is joined with those
Customers rows that match the join predicate at the point in time when the
Orders row is processed by the lookup join operator.
In lookup join, external storage should provide ability the maintain the
latest snapshot of dimension table.
In processing time temporal join, join operator would maintain the most
recent version of dimension table.

> 2.2 Since the customer table does not have time attribute, how does Flink
keep track of change to customer table to make sure that it joins the order
rows only with customer row matching the order with the appropriate proc
time ?
Same with 2.1

> 2.3. How does flink make sure that the join results is not updated if
customer record is updated ?
Both lookup join and processing time temporal join, dimension table updates
would not effect the result of join because they would not trigger join
logical.

>  2.4 If we run the same application twice - with customer record changing
in between the two runs, since the orders table has proc time and customer
record does not show any time  attribute, will the results of join differ -
since the customer record has changed during the two runs ?
Yes.

Best,
JING ZHANG

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#event-time-temporal-join

M Singh <mans2si...@yahoo.com> 于2021年7月7日周三 下午5:22写道:

> Hi Jing:
>
> Thanks for your explanation and references.
>
> I looked at your reference (
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join)
> and have a few question regarding the example code:
>
> CREATE TEMPORARY TABLE Customers (
>   id INT,
>   name STRING,
>   country STRING,
>   zip STRING) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
>   'table-name' = 'customers');
> -- enrich each order with customer informationSELECT o.order_id, o.total, 
> c.country, c.zipFROM Orders AS o
>   JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>     ON o.customer_id = c.id;
>
> The explanation mentions:
>
> <snip>
> A lookup join is typically used to enrich a table with data that is
> queried from an external system. The join requires one table to have a
> processing time attribute and the other table to be backed by a lookup
> source connector.
> </snip>
>
> Questions:
> 1. Does this mean that this only works with proc time ?
> 2. Is there a way to deal with event time for orders and if so how ?
>
> <snip>
>
> The FOR SYSTEM_TIME AS OF clause with the subsequent processing time
> attribute ensures that each row of the Orders table is joined with those
> Customers rows that match the join predicate at the point in time when the
> Orders row is processed by the join operator. It also prevents that the
> join result is updated when a joined Customer row is updated in the
> future.
>
> </snip>
>
> In the above code fragment - the customer table does not have time
> attribute, only the order table has proc_time attribute.  So:
>
> 1. What is the purpose of use proc_time from the order side ?  Is it to
> only limit the lasts order record for the lookup or is it to restrict the
> customer record ?  Does this mean that flink tracks proc time for the
> customer table ?
> 2. Since the customer table does not have time attribute, how does flink
> keep track of change to customer table to make sure that it joins the order
> rows only with customer row matching the order with the appropriate proc
> time ?
> 3. How does flink make sure that the join results is not updated if
> customer record is updated ?
> 4. If we run the same application twice - with customer record changing in
> between the two runs, since the orders table has proc time and customer
> record does not show any time  attribute, will the results of join differ -
> since the customer record has changed during the two runs ?
>
> Thanks again for your explanation and references.
>
>
>
>
> On Tuesday, July 6, 2021, 11:24:32 PM EDT, JING ZHANG <
> beyond1...@gmail.com> wrote:
>
>
> Hi Mans,
> `LookupTableSource` used to look up rows from external storage system by
> given keys, it's very suitable for Key/Value storage system (e.g Redis,
> HBase), or storage system with key concept (e.g, Mysql, Hive). `
> ScanTableSource` is used to scan all rows from an external storage system.
> Some connectors implement both `LookupTableSource` and `ScanTableSource`
> because they both have two abilities. (e.g Hive, HBase, JDBC).
>
> > 1. Are there other examples/documentation on how to create a query that
> uses it vs ScanTableSource ?
> If SQL use Lookup join[1] syntax, optimizer would expect underlying table
> source which related to the right side has implemented `LookupTableSource`.
> please note only right side is `LookupTableSource`, left side is still
> `ScanTableSource`.
> If SQL uses non-Lookup join syntax (e.g Regular join, Interval join,
> temporal join, window join)[1], the optimizer would expect the underlying
> table source to have implemented `ScanTableSource`.
>
> > 2. Are there any best practices for using this interface ?
> What do you expect to find in `a best practice for using this interface`?
> For a TableApi/SQL user, maybe there are not many difficult problems to use
> a `LookupTableSource` because they would not deal with `LookupTableSource`
> explicitly, they only use Lookup join[1] syntax.
>
> > 3. How does the planner decide to use LookupTableSource vs
> ScanTableSource ?
> Please see the first response above
>
> > 4. Are there some hints/etc that can be given to the planner to force it
> to use LookupTableSource ?
> No, there is no need to do this. useLookup join[1] syntax if need use
> `LookupTableSource`,
>
> > 5. Is this used only for joins or for regular queries ?
> Please see the first response above
>
> > 6. Can it be used for regular joins ? If so, is there any
> documentation/example ?
> If the underlying tablesource only implements `ScanTableSource`, then the
> user could not use it in lookup join. If the underlying tablesource only
> implements `LookupTableSource`, then the user could not use it in non-Lookup
> join syntax (e.g Regular join, Interval join, temporal join, window join).
> Otherwise an exception would be thrown out in the compile phase.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
>
> Best regards,
> JING ZHANG
>
>
> M Singh <mans2si...@yahoo.com> 于2021年7月7日周三 上午8:23写道:
>
> Hey Folks:
>
> I am trying to understand how LookupTableSource works and have a few
> questions:
>
> 1. Are there other examples/documentation on how create a query that uses
> it vs ScanTableSource ?
> 2. Are there any best practices for using this interface ?
> 3. How does the planner decide to use LookupTableSource vs ScanTableSource
> ?
> 4. Are there some hints/etc that can be given to the planner to force it
> to use LookupTableSource ?
> 5. Is this used only for joins or for regular queries ?
> 6. Can it be used for regular joins ? If so, is there any
> documentation/example ?
>
> Thanks for your help.
>
> Mans
>
>

Reply via email to