Thanks for the response, Arvid! I have few comments on your message.
> but could also live with an easier solution as the first step: I think that these 2 ways are mutually exclusive (originally proposed by Qingsheng and mine), because conceptually they follow the same goal, but implementation details are different. If we will go one way, moving to another way in the future will mean deleting existing code and once again changing the API for connectors. So I think we should reach a consensus with the community about that and then work together on this FLIP, i.e. divide the work on tasks for different parts of the flip (for example, LRU cache unification / introducing proposed set of metrics / further work…). WDYT, Qingsheng? > as the source will only receive the requests after filter Actually if filters are applied to fields of the lookup table, we firstly must do requests, and only after that we can filter responses, because lookup connectors don't have filter pushdown. So if filtering is done before caching, there will be much less rows in cache. > @Alexander unfortunately, your architecture is not shared. I don't know the > solution to share images to be honest. Sorry for that, I’m a bit new to such kinds of conversations :) I have no write access to the confluence, so I made a Jira issue, where described the proposed changes in more details - https://issues.apache.org/jira/browse/FLINK-27411. Will happy to get more feedback! Best, Smirnov Alexander пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>: > > Hi Qingsheng, > > Thanks for driving this; the inconsistency was not satisfying for me. > > I second Alexander's idea though but could also live with an easier > solution as the first step: Instead of making caching an implementation > detail of TableFunction X, rather devise a caching layer around X. So the > proposal would be a CachingTableFunction that delegates to X in case of > misses and else manages the cache. Lifting it into the operator model as > proposed would be even better but is probably unnecessary in the first step > for a lookup source (as the source will only receive the requests after > filter; applying projection may be more interesting to save memory). > > Another advantage is that all the changes of this FLIP would be limited to > options, no need for new public interfaces. Everything else remains an > implementation of Table runtime. That means we can easily incorporate the > optimization potential that Alexander pointed out later. > > @Alexander unfortunately, your architecture is not shared. I don't know the > solution to share images to be honest. > > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <smirale...@gmail.com> > wrote: > > > Hi Qingsheng! My name is Alexander, I'm not a committer yet, but I'd > > really like to become one. And this FLIP really interested me. > > Actually I have worked on a similar feature in my company’s Flink > > fork, and we would like to share our thoughts on this and make code > > open source. > > > > I think there is a better alternative than introducing an abstract > > class for TableFunction (CachingTableFunction). As you know, > > TableFunction exists in the flink-table-common module, which provides > > only an API for working with tables – it’s very convenient for importing > > in connectors. In turn, CachingTableFunction contains logic for > > runtime execution, so this class and everything connected with it > > should be located in another module, probably in flink-table-runtime. > > But this will require connectors to depend on another module, which > > contains a lot of runtime logic, which doesn’t sound good. > > > > I suggest adding a new method ‘getLookupConfig’ to LookupTableSource > > or LookupRuntimeProvider to allow connectors to only pass > > configurations to the planner, therefore they won’t depend on runtime > > realization. Based on these configs planner will construct a lookup > > join operator with corresponding runtime logic (ProcessFunctions in > > module flink-table-runtime). Architecture looks like in the pinned > > image (LookupConfig class there is actually yours CacheConfig). > > > > Classes in flink-table-planner, that will be responsible for this – > > CommonPhysicalLookupJoin and his inheritors. > > Current classes for lookup join in flink-table-runtime - > > LookupJoinRunner, AsyncLookupJoinRunner, LookupJoinRunnerWithCalc, > > AsyncLookupJoinRunnerWithCalc. > > > > I suggest adding classes LookupJoinCachingRunner, > > LookupJoinCachingRunnerWithCalc, etc. > > > > And here comes another more powerful advantage of such a solution. If > > we have caching logic on a lower level, we can apply some > > optimizations to it. LookupJoinRunnerWithCalc was named like this > > because it uses the ‘calc’ function, which actually mostly consists of > > filters and projections. > > > > For example, in join table A with lookup table B condition ‘JOIN … ON > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’ ‘calc’ > > function will contain filters A.age = B.age + 10 and B.salary > 1000. > > > > If we apply this function before storing records in cache, size of > > cache will be significantly reduced: filters = avoid storing useless > > records in cache, projections = reduce records’ size. So the initial > > max number of records in cache can be increased by the user. > > > > What do you think about it? > > > > > > On 2022/04/19 02:47:11 Qingsheng Ren wrote: > > > Hi devs, > > > > > > Yuan and I would like to start a discussion about FLIP-221[1], which > > introduces an abstraction of lookup table cache and its standard metrics. > > > > > > Currently each lookup table source should implement their own cache to > > store lookup results, and there isn’t a standard of metrics for users and > > developers to tuning their jobs with lookup joins, which is a quite common > > use case in Flink table / SQL. > > > > > > Therefore we propose some new APIs including cache, metrics, wrapper > > classes of TableFunction and new table options. Please take a look at the > > FLIP page [1] to get more details. Any suggestions and comments would be > > appreciated! > > > > > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > > > > Best regards, > > > > > > Qingsheng > > > > > > > >