Hi Qingsheng! My name is Alexander, I'm not a committer yet, but I'd
really like to become one. And this FLIP really interested me.
Actually I have worked on a similar feature in my company’s Flink
fork, and we would like to share our thoughts on this and make code
open source.

I think there is a better alternative than introducing an abstract
class for TableFunction (CachingTableFunction). As you know,
TableFunction exists in the flink-table-common module, which provides
only an API for working with tables – it’s very convenient for importing
in connectors. In turn, CachingTableFunction contains logic for
runtime execution,  so this class and everything connected with it
should be located in another module, probably in flink-table-runtime.
But this will require connectors to depend on another module, which
contains a lot of runtime logic, which doesn’t sound good.

I suggest adding a new method ‘getLookupConfig’ to LookupTableSource
or LookupRuntimeProvider to allow connectors to only pass
configurations to the planner, therefore they won’t depend on runtime
realization. Based on these configs planner will construct a lookup
join operator with corresponding runtime logic (ProcessFunctions in
module flink-table-runtime). Architecture looks like in the pinned
image (LookupConfig class there is actually yours CacheConfig).

Classes in flink-table-planner, that will be responsible for this –
CommonPhysicalLookupJoin and his inheritors.
Current classes for lookup join in  flink-table-runtime  -
LookupJoinRunner, AsyncLookupJoinRunner, LookupJoinRunnerWithCalc,
AsyncLookupJoinRunnerWithCalc.

I suggest adding classes LookupJoinCachingRunner,
LookupJoinCachingRunnerWithCalc, etc.

And here comes another more powerful advantage of such a solution. If
we have caching logic on a lower level, we can apply some
optimizations to it. LookupJoinRunnerWithCalc was named like this
because it uses the ‘calc’ function, which actually mostly consists of
filters and projections.

For example, in join table A with lookup table B condition ‘JOIN … ON
A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’  ‘calc’
function will contain filters A.age = B.age + 10 and B.salary > 1000.

If we apply this function before storing records in cache, size of
cache will be significantly reduced: filters = avoid storing useless
records in cache, projections = reduce records’ size. So the initial
max number of records in cache can be increased by the user.

What do you think about it?


On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> Hi devs,
>
> Yuan and I would like to start a discussion about FLIP-221[1], which 
> introduces an abstraction of lookup table cache and its standard metrics.
>
> Currently each lookup table source should implement their own cache to store 
> lookup results, and there isn’t a standard of metrics for users and 
> developers to tuning their jobs with lookup joins, which is a quite common 
> use case in Flink table / SQL.
>
> Therefore we propose some new APIs including cache, metrics, wrapper classes 
> of TableFunction and new table options. Please take a look at the FLIP page 
> [1] to get more details. Any suggestions and comments would be appreciated!
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>
> Best regards,
>
> Qingsheng
>
>

Reply via email to