It's a much more complicated activity and lies out of the scope of this improvement. Because such pushdowns should be done for all ScanTableSource implementations (not only for Lookup ones).
On Thu, 5 May 2022 at 19:02, Martijn Visser <martijnvis...@apache.org> wrote: > Hi everyone, > > One question regarding "And Alexander correctly mentioned that filter > pushdown still is not implemented for jdbc/hive/hbase." -> Would an > alternative solution be to actually implement these filter pushdowns? I can > imagine that there are many more benefits to doing that, outside of lookup > caching and metrics. > > Best regards, > > Martijn Visser > https://twitter.com/MartijnVisser82 > https://github.com/MartijnVisser > > > On Thu, 5 May 2022 at 13:58, Roman Boyko <ro.v.bo...@gmail.com> wrote: > > > Hi everyone! > > > > Thanks for driving such a valuable improvement! > > > > I do think that single cache implementation would be a nice opportunity > for > > users. And it will break the "FOR SYSTEM_TIME AS OF proc_time" semantics > > anyway - doesn't matter how it will be implemented. > > > > Putting myself in the user's shoes, I can say that: > > 1) I would prefer to have the opportunity to cut off the cache size by > > simply filtering unnecessary data. And the most handy way to do it is > apply > > it inside LookupRunners. It would be a bit harder to pass it through the > > LookupJoin node to TableFunction. And Alexander correctly mentioned that > > filter pushdown still is not implemented for jdbc/hive/hbase. > > 2) The ability to set the different caching parameters for different > tables > > is quite important. So I would prefer to set it through DDL rather than > > have similar ttla, strategy and other options for all lookup tables. > > 3) Providing the cache into the framework really deprives us of > > extensibility (users won't be able to implement their own cache). But > most > > probably it might be solved by creating more different cache strategies > and > > a wider set of configurations. > > > > All these points are much closer to the schema proposed by Alexander. > > Qingshen Ren, please correct me if I'm not right and all these facilities > > might be simply implemented in your architecture? > > > > Best regards, > > Roman Boyko > > e.: ro.v.bo...@gmail.com > > > > On Wed, 4 May 2022 at 21:01, Martijn Visser <martijnvis...@apache.org> > > wrote: > > > > > Hi everyone, > > > > > > I don't have much to chip in, but just wanted to express that I really > > > appreciate the in-depth discussion on this topic and I hope that others > > > will join the conversation. > > > > > > Best regards, > > > > > > Martijn > > > > > > On Tue, 3 May 2022 at 10:15, Александр Смирнов <smirale...@gmail.com> > > > wrote: > > > > > > > Hi Qingsheng, Leonard and Jark, > > > > > > > > Thanks for your detailed feedback! However, I have questions about > > > > some of your statements (maybe I didn't get something?). > > > > > > > > > Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF > > > proc_time” > > > > > > > > I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time" is > not > > > > fully implemented with caching, but as you said, users go on it > > > > consciously to achieve better performance (no one proposed to enable > > > > caching by default, etc.). Or by users do you mean other developers > of > > > > connectors? In this case developers explicitly specify whether their > > > > connector supports caching or not (in the list of supported options), > > > > no one makes them do that if they don't want to. So what exactly is > > > > the difference between implementing caching in modules > > > > flink-table-runtime and in flink-table-common from the considered > > > > point of view? How does it affect on breaking/non-breaking the > > > > semantics of "FOR SYSTEM_TIME AS OF proc_time"? > > > > > > > > > confront a situation that allows table options in DDL to control > the > > > > behavior of the framework, which has never happened previously and > > should > > > > be cautious > > > > > > > > If we talk about main differences of semantics of DDL options and > > > > config options("table.exec.xxx"), isn't it about limiting the scope > of > > > > the options + importance for the user business logic rather than > > > > specific location of corresponding logic in the framework? I mean > that > > > > in my design, for example, putting an option with lookup cache > > > > strategy in configurations would be the wrong decision, because it > > > > directly affects the user's business logic (not just performance > > > > optimization) + touches just several functions of ONE table (there > can > > > > be multiple tables with different caches). Does it really matter for > > > > the user (or someone else) where the logic is located, which is > > > > affected by the applied option? > > > > Also I can remember DDL option 'sink.parallelism', which in some way > > > > "controls the behavior of the framework" and I don't see any problem > > > > here. > > > > > > > > > introduce a new interface for this all-caching scenario and the > > design > > > > would become more complex > > > > > > > > This is a subject for a separate discussion, but actually in our > > > > internal version we solved this problem quite easily - we reused > > > > InputFormat class (so there is no need for a new API). The point is > > > > that currently all lookup connectors use InputFormat for scanning the > > > > data in batch mode: HBase, JDBC and even Hive - it uses class > > > > PartitionReader, that is actually just a wrapper around InputFormat. > > > > The advantage of this solution is the ability to reload cache data in > > > > parallel (number of threads depends on number of InputSplits, but has > > > > an upper limit). As a result cache reload time significantly reduces > > > > (as well as time of input stream blocking). I know that usually we > try > > > > to avoid usage of concurrency in Flink code, but maybe this one can > be > > > > an exception. BTW I don't say that it's an ideal solution, maybe > there > > > > are better ones. > > > > > > > > > Providing the cache in the framework might introduce compatibility > > > issues > > > > > > > > It's possible only in cases when the developer of the connector won't > > > > properly refactor his code and will use new cache options incorrectly > > > > (i.e. explicitly provide the same options into 2 different code > > > > places). For correct behavior all he will need to do is to redirect > > > > existing options to the framework's LookupConfig (+ maybe add an > alias > > > > for options, if there was different naming), everything will be > > > > transparent for users. If the developer won't do refactoring at all, > > > > nothing will be changed for the connector because of backward > > > > compatibility. Also if a developer wants to use his own cache logic, > > > > he just can refuse to pass some of the configs into the framework, > and > > > > instead make his own implementation with already existing configs and > > > > metrics (but actually I think that it's a rare case). > > > > > > > > > filters and projections should be pushed all the way down to the > > table > > > > function, like what we do in the scan source > > > > > > > > It's the great purpose. But the truth is that the ONLY connector that > > > > supports filter pushdown is FileSystemTableSource > > > > (no database connector supports it currently). Also for some > databases > > > > it's simply impossible to pushdown such complex filters that we have > > > > in Flink. > > > > > > > > > only applying these optimizations to the cache seems not quite > > useful > > > > > > > > Filters can cut off an arbitrarily large amount of data from the > > > > dimension table. For a simple example, suppose in dimension table > > > > 'users' > > > > we have column 'age' with values from 20 to 40, and input stream > > > > 'clicks' that is ~uniformly distributed by age of users. If we have > > > > filter 'age > 30', > > > > there will be twice less data in cache. This means the user can > > > > increase 'lookup.cache.max-rows' by almost 2 times. It will gain a > > > > huge > > > > performance boost. Moreover, this optimization starts to really shine > > > > in 'ALL' cache, where tables without filters and projections can't > fit > > > > in memory, but with them - can. This opens up additional > possibilities > > > > for users. And this doesn't sound as 'not quite useful'. > > > > > > > > It would be great to hear other voices regarding this topic! Because > > > > we have quite a lot of controversial points, and I think with the > help > > > > of others it will be easier for us to come to a consensus. > > > > > > > > Best regards, > > > > Smirnov Alexander > > > > > > > > > > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <renqs...@gmail.com>: > > > > > > > > > > Hi Alexander and Arvid, > > > > > > > > > > Thanks for the discussion and sorry for my late response! We had an > > > > internal discussion together with Jark and Leonard and I’d like to > > > > summarize our ideas. Instead of implementing the cache logic in the > > table > > > > runtime layer or wrapping around the user-provided table function, we > > > > prefer to introduce some new APIs extending TableFunction with these > > > > concerns: > > > > > > > > > > 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF > > > > proc_time”, because it couldn’t truly reflect the content of the > lookup > > > > table at the moment of querying. If users choose to enable caching on > > the > > > > lookup table, they implicitly indicate that this breakage is > acceptable > > > in > > > > exchange for the performance. So we prefer not to provide caching on > > the > > > > table runtime level. > > > > > > > > > > 2. If we make the cache implementation in the framework (whether > in a > > > > runner or a wrapper around TableFunction), we have to confront a > > > situation > > > > that allows table options in DDL to control the behavior of the > > > framework, > > > > which has never happened previously and should be cautious. Under the > > > > current design the behavior of the framework should only be specified > > by > > > > configurations (“table.exec.xxx”), and it’s hard to apply these > general > > > > configs to a specific table. > > > > > > > > > > 3. We have use cases that lookup source loads and refresh all > records > > > > periodically into the memory to achieve high lookup performance (like > > > Hive > > > > connector in the community, and also widely used by our internal > > > > connectors). Wrapping the cache around the user’s TableFunction works > > > fine > > > > for LRU caches, but I think we have to introduce a new interface for > > this > > > > all-caching scenario and the design would become more complex. > > > > > > > > > > 4. Providing the cache in the framework might introduce > compatibility > > > > issues to existing lookup sources like there might exist two caches > > with > > > > totally different strategies if the user incorrectly configures the > > table > > > > (one in the framework and another implemented by the lookup source). > > > > > > > > > > As for the optimization mentioned by Alexander, I think filters and > > > > projections should be pushed all the way down to the table function, > > like > > > > what we do in the scan source, instead of the runner with the cache. > > The > > > > goal of using cache is to reduce the network I/O and pressure on the > > > > external system, and only applying these optimizations to the cache > > seems > > > > not quite useful. > > > > > > > > > > I made some updates to the FLIP[1] to reflect our ideas. We prefer > to > > > > keep the cache implementation as a part of TableFunction, and we > could > > > > provide some helper classes (CachingTableFunction, > > > AllCachingTableFunction, > > > > CachingAsyncTableFunction) to developers and regulate metrics of the > > > cache. > > > > Also, I made a POC[2] for your reference. > > > > > > > > > > Looking forward to your ideas! > > > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221 > > > > > > > > > > Best regards, > > > > > > > > > > Qingsheng > > > > > > > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов < > > > smirale...@gmail.com> > > > > wrote: > > > > >> > > > > >> Thanks for the response, Arvid! > > > > >> > > > > >> I have few comments on your message. > > > > >> > > > > >> > but could also live with an easier solution as the first step: > > > > >> > > > > >> I think that these 2 ways are mutually exclusive (originally > > proposed > > > > >> by Qingsheng and mine), because conceptually they follow the same > > > > >> goal, but implementation details are different. If we will go one > > way, > > > > >> moving to another way in the future will mean deleting existing > code > > > > >> and once again changing the API for connectors. So I think we > should > > > > >> reach a consensus with the community about that and then work > > together > > > > >> on this FLIP, i.e. divide the work on tasks for different parts of > > the > > > > >> flip (for example, LRU cache unification / introducing proposed > set > > of > > > > >> metrics / further work…). WDYT, Qingsheng? > > > > >> > > > > >> > as the source will only receive the requests after filter > > > > >> > > > > >> Actually if filters are applied to fields of the lookup table, we > > > > >> firstly must do requests, and only after that we can filter > > responses, > > > > >> because lookup connectors don't have filter pushdown. So if > > filtering > > > > >> is done before caching, there will be much less rows in cache. > > > > >> > > > > >> > @Alexander unfortunately, your architecture is not shared. I > don't > > > > know the > > > > >> > > > > >> > solution to share images to be honest. > > > > >> > > > > >> Sorry for that, I’m a bit new to such kinds of conversations :) > > > > >> I have no write access to the confluence, so I made a Jira issue, > > > > >> where described the proposed changes in more details - > > > > >> https://issues.apache.org/jira/browse/FLINK-27411. > > > > >> > > > > >> Will happy to get more feedback! > > > > >> > > > > >> Best, > > > > >> Smirnov Alexander > > > > >> > > > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>: > > > > >> > > > > > >> > Hi Qingsheng, > > > > >> > > > > > >> > Thanks for driving this; the inconsistency was not satisfying > for > > > me. > > > > >> > > > > > >> > I second Alexander's idea though but could also live with an > > easier > > > > >> > solution as the first step: Instead of making caching an > > > > implementation > > > > >> > detail of TableFunction X, rather devise a caching layer around > X. > > > So > > > > the > > > > >> > proposal would be a CachingTableFunction that delegates to X in > > case > > > > of > > > > >> > misses and else manages the cache. Lifting it into the operator > > > model > > > > as > > > > >> > proposed would be even better but is probably unnecessary in the > > > > first step > > > > >> > for a lookup source (as the source will only receive the > requests > > > > after > > > > >> > filter; applying projection may be more interesting to save > > memory). > > > > >> > > > > > >> > Another advantage is that all the changes of this FLIP would be > > > > limited to > > > > >> > options, no need for new public interfaces. Everything else > > remains > > > an > > > > >> > implementation of Table runtime. That means we can easily > > > incorporate > > > > the > > > > >> > optimization potential that Alexander pointed out later. > > > > >> > > > > > >> > @Alexander unfortunately, your architecture is not shared. I > don't > > > > know the > > > > >> > solution to share images to be honest. > > > > >> > > > > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов < > > > > smirale...@gmail.com> > > > > >> > wrote: > > > > >> > > > > > >> > > Hi Qingsheng! My name is Alexander, I'm not a committer yet, > but > > > I'd > > > > >> > > really like to become one. And this FLIP really interested me. > > > > >> > > Actually I have worked on a similar feature in my company’s > > Flink > > > > >> > > fork, and we would like to share our thoughts on this and make > > > code > > > > >> > > open source. > > > > >> > > > > > > >> > > I think there is a better alternative than introducing an > > abstract > > > > >> > > class for TableFunction (CachingTableFunction). As you know, > > > > >> > > TableFunction exists in the flink-table-common module, which > > > > provides > > > > >> > > only an API for working with tables – it’s very convenient for > > > > importing > > > > >> > > in connectors. In turn, CachingTableFunction contains logic > for > > > > >> > > runtime execution, so this class and everything connected > with > > it > > > > >> > > should be located in another module, probably in > > > > flink-table-runtime. > > > > >> > > But this will require connectors to depend on another module, > > > which > > > > >> > > contains a lot of runtime logic, which doesn’t sound good. > > > > >> > > > > > > >> > > I suggest adding a new method ‘getLookupConfig’ to > > > LookupTableSource > > > > >> > > or LookupRuntimeProvider to allow connectors to only pass > > > > >> > > configurations to the planner, therefore they won’t depend on > > > > runtime > > > > >> > > realization. Based on these configs planner will construct a > > > lookup > > > > >> > > join operator with corresponding runtime logic > (ProcessFunctions > > > in > > > > >> > > module flink-table-runtime). Architecture looks like in the > > pinned > > > > >> > > image (LookupConfig class there is actually yours > CacheConfig). > > > > >> > > > > > > >> > > Classes in flink-table-planner, that will be responsible for > > this > > > – > > > > >> > > CommonPhysicalLookupJoin and his inheritors. > > > > >> > > Current classes for lookup join in flink-table-runtime - > > > > >> > > LookupJoinRunner, AsyncLookupJoinRunner, > > LookupJoinRunnerWithCalc, > > > > >> > > AsyncLookupJoinRunnerWithCalc. > > > > >> > > > > > > >> > > I suggest adding classes LookupJoinCachingRunner, > > > > >> > > LookupJoinCachingRunnerWithCalc, etc. > > > > >> > > > > > > >> > > And here comes another more powerful advantage of such a > > solution. > > > > If > > > > >> > > we have caching logic on a lower level, we can apply some > > > > >> > > optimizations to it. LookupJoinRunnerWithCalc was named like > > this > > > > >> > > because it uses the ‘calc’ function, which actually mostly > > > consists > > > > of > > > > >> > > filters and projections. > > > > >> > > > > > > >> > > For example, in join table A with lookup table B condition > > ‘JOIN … > > > > ON > > > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’ > > ‘calc’ > > > > >> > > function will contain filters A.age = B.age + 10 and B.salary > > > > > > 1000. > > > > >> > > > > > > >> > > If we apply this function before storing records in cache, > size > > of > > > > >> > > cache will be significantly reduced: filters = avoid storing > > > useless > > > > >> > > records in cache, projections = reduce records’ size. So the > > > initial > > > > >> > > max number of records in cache can be increased by the user. > > > > >> > > > > > > >> > > What do you think about it? > > > > >> > > > > > > >> > > > > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote: > > > > >> > > > Hi devs, > > > > >> > > > > > > > >> > > > Yuan and I would like to start a discussion about > FLIP-221[1], > > > > which > > > > >> > > introduces an abstraction of lookup table cache and its > standard > > > > metrics. > > > > >> > > > > > > > >> > > > Currently each lookup table source should implement their > own > > > > cache to > > > > >> > > store lookup results, and there isn’t a standard of metrics > for > > > > users and > > > > >> > > developers to tuning their jobs with lookup joins, which is a > > > quite > > > > common > > > > >> > > use case in Flink table / SQL. > > > > >> > > > > > > > >> > > > Therefore we propose some new APIs including cache, metrics, > > > > wrapper > > > > >> > > classes of TableFunction and new table options. Please take a > > look > > > > at the > > > > >> > > FLIP page [1] to get more details. Any suggestions and > comments > > > > would be > > > > >> > > appreciated! > > > > >> > > > > > > > >> > > > [1] > > > > >> > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > > >> > > > > > > > >> > > > Best regards, > > > > >> > > > > > > > >> > > > Qingsheng > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Best Regards, > > > > > > > > > > Qingsheng Ren > > > > > > > > > > Real-time Computing Team > > > > > Alibaba Cloud > > > > > > > > > > Email: renqs...@gmail.com > > > > > > > > > > -- Best regards, Roman Boyko e.: ro.v.bo...@gmail.com