Hi everyone, I don't have much to chip in, but just wanted to express that I really appreciate the in-depth discussion on this topic and I hope that others will join the conversation.
Best regards, Martijn On Tue, 3 May 2022 at 10:15, Александр Смирнов <smirale...@gmail.com> wrote: > Hi Qingsheng, Leonard and Jark, > > Thanks for your detailed feedback! However, I have questions about > some of your statements (maybe I didn't get something?). > > > Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF proc_time” > > I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time" is not > fully implemented with caching, but as you said, users go on it > consciously to achieve better performance (no one proposed to enable > caching by default, etc.). Or by users do you mean other developers of > connectors? In this case developers explicitly specify whether their > connector supports caching or not (in the list of supported options), > no one makes them do that if they don't want to. So what exactly is > the difference between implementing caching in modules > flink-table-runtime and in flink-table-common from the considered > point of view? How does it affect on breaking/non-breaking the > semantics of "FOR SYSTEM_TIME AS OF proc_time"? > > > confront a situation that allows table options in DDL to control the > behavior of the framework, which has never happened previously and should > be cautious > > If we talk about main differences of semantics of DDL options and > config options("table.exec.xxx"), isn't it about limiting the scope of > the options + importance for the user business logic rather than > specific location of corresponding logic in the framework? I mean that > in my design, for example, putting an option with lookup cache > strategy in configurations would be the wrong decision, because it > directly affects the user's business logic (not just performance > optimization) + touches just several functions of ONE table (there can > be multiple tables with different caches). Does it really matter for > the user (or someone else) where the logic is located, which is > affected by the applied option? > Also I can remember DDL option 'sink.parallelism', which in some way > "controls the behavior of the framework" and I don't see any problem > here. > > > introduce a new interface for this all-caching scenario and the design > would become more complex > > This is a subject for a separate discussion, but actually in our > internal version we solved this problem quite easily - we reused > InputFormat class (so there is no need for a new API). The point is > that currently all lookup connectors use InputFormat for scanning the > data in batch mode: HBase, JDBC and even Hive - it uses class > PartitionReader, that is actually just a wrapper around InputFormat. > The advantage of this solution is the ability to reload cache data in > parallel (number of threads depends on number of InputSplits, but has > an upper limit). As a result cache reload time significantly reduces > (as well as time of input stream blocking). I know that usually we try > to avoid usage of concurrency in Flink code, but maybe this one can be > an exception. BTW I don't say that it's an ideal solution, maybe there > are better ones. > > > Providing the cache in the framework might introduce compatibility issues > > It's possible only in cases when the developer of the connector won't > properly refactor his code and will use new cache options incorrectly > (i.e. explicitly provide the same options into 2 different code > places). For correct behavior all he will need to do is to redirect > existing options to the framework's LookupConfig (+ maybe add an alias > for options, if there was different naming), everything will be > transparent for users. If the developer won't do refactoring at all, > nothing will be changed for the connector because of backward > compatibility. Also if a developer wants to use his own cache logic, > he just can refuse to pass some of the configs into the framework, and > instead make his own implementation with already existing configs and > metrics (but actually I think that it's a rare case). > > > filters and projections should be pushed all the way down to the table > function, like what we do in the scan source > > It's the great purpose. But the truth is that the ONLY connector that > supports filter pushdown is FileSystemTableSource > (no database connector supports it currently). Also for some databases > it's simply impossible to pushdown such complex filters that we have > in Flink. > > > only applying these optimizations to the cache seems not quite useful > > Filters can cut off an arbitrarily large amount of data from the > dimension table. For a simple example, suppose in dimension table > 'users' > we have column 'age' with values from 20 to 40, and input stream > 'clicks' that is ~uniformly distributed by age of users. If we have > filter 'age > 30', > there will be twice less data in cache. This means the user can > increase 'lookup.cache.max-rows' by almost 2 times. It will gain a > huge > performance boost. Moreover, this optimization starts to really shine > in 'ALL' cache, where tables without filters and projections can't fit > in memory, but with them - can. This opens up additional possibilities > for users. And this doesn't sound as 'not quite useful'. > > It would be great to hear other voices regarding this topic! Because > we have quite a lot of controversial points, and I think with the help > of others it will be easier for us to come to a consensus. > > Best regards, > Smirnov Alexander > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <renqs...@gmail.com>: > > > > Hi Alexander and Arvid, > > > > Thanks for the discussion and sorry for my late response! We had an > internal discussion together with Jark and Leonard and I’d like to > summarize our ideas. Instead of implementing the cache logic in the table > runtime layer or wrapping around the user-provided table function, we > prefer to introduce some new APIs extending TableFunction with these > concerns: > > > > 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF > proc_time”, because it couldn’t truly reflect the content of the lookup > table at the moment of querying. If users choose to enable caching on the > lookup table, they implicitly indicate that this breakage is acceptable in > exchange for the performance. So we prefer not to provide caching on the > table runtime level. > > > > 2. If we make the cache implementation in the framework (whether in a > runner or a wrapper around TableFunction), we have to confront a situation > that allows table options in DDL to control the behavior of the framework, > which has never happened previously and should be cautious. Under the > current design the behavior of the framework should only be specified by > configurations (“table.exec.xxx”), and it’s hard to apply these general > configs to a specific table. > > > > 3. We have use cases that lookup source loads and refresh all records > periodically into the memory to achieve high lookup performance (like Hive > connector in the community, and also widely used by our internal > connectors). Wrapping the cache around the user’s TableFunction works fine > for LRU caches, but I think we have to introduce a new interface for this > all-caching scenario and the design would become more complex. > > > > 4. Providing the cache in the framework might introduce compatibility > issues to existing lookup sources like there might exist two caches with > totally different strategies if the user incorrectly configures the table > (one in the framework and another implemented by the lookup source). > > > > As for the optimization mentioned by Alexander, I think filters and > projections should be pushed all the way down to the table function, like > what we do in the scan source, instead of the runner with the cache. The > goal of using cache is to reduce the network I/O and pressure on the > external system, and only applying these optimizations to the cache seems > not quite useful. > > > > I made some updates to the FLIP[1] to reflect our ideas. We prefer to > keep the cache implementation as a part of TableFunction, and we could > provide some helper classes (CachingTableFunction, AllCachingTableFunction, > CachingAsyncTableFunction) to developers and regulate metrics of the cache. > Also, I made a POC[2] for your reference. > > > > Looking forward to your ideas! > > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221 > > > > Best regards, > > > > Qingsheng > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <smirale...@gmail.com> > wrote: > >> > >> Thanks for the response, Arvid! > >> > >> I have few comments on your message. > >> > >> > but could also live with an easier solution as the first step: > >> > >> I think that these 2 ways are mutually exclusive (originally proposed > >> by Qingsheng and mine), because conceptually they follow the same > >> goal, but implementation details are different. If we will go one way, > >> moving to another way in the future will mean deleting existing code > >> and once again changing the API for connectors. So I think we should > >> reach a consensus with the community about that and then work together > >> on this FLIP, i.e. divide the work on tasks for different parts of the > >> flip (for example, LRU cache unification / introducing proposed set of > >> metrics / further work…). WDYT, Qingsheng? > >> > >> > as the source will only receive the requests after filter > >> > >> Actually if filters are applied to fields of the lookup table, we > >> firstly must do requests, and only after that we can filter responses, > >> because lookup connectors don't have filter pushdown. So if filtering > >> is done before caching, there will be much less rows in cache. > >> > >> > @Alexander unfortunately, your architecture is not shared. I don't > know the > >> > >> > solution to share images to be honest. > >> > >> Sorry for that, I’m a bit new to such kinds of conversations :) > >> I have no write access to the confluence, so I made a Jira issue, > >> where described the proposed changes in more details - > >> https://issues.apache.org/jira/browse/FLINK-27411. > >> > >> Will happy to get more feedback! > >> > >> Best, > >> Smirnov Alexander > >> > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>: > >> > > >> > Hi Qingsheng, > >> > > >> > Thanks for driving this; the inconsistency was not satisfying for me. > >> > > >> > I second Alexander's idea though but could also live with an easier > >> > solution as the first step: Instead of making caching an > implementation > >> > detail of TableFunction X, rather devise a caching layer around X. So > the > >> > proposal would be a CachingTableFunction that delegates to X in case > of > >> > misses and else manages the cache. Lifting it into the operator model > as > >> > proposed would be even better but is probably unnecessary in the > first step > >> > for a lookup source (as the source will only receive the requests > after > >> > filter; applying projection may be more interesting to save memory). > >> > > >> > Another advantage is that all the changes of this FLIP would be > limited to > >> > options, no need for new public interfaces. Everything else remains an > >> > implementation of Table runtime. That means we can easily incorporate > the > >> > optimization potential that Alexander pointed out later. > >> > > >> > @Alexander unfortunately, your architecture is not shared. I don't > know the > >> > solution to share images to be honest. > >> > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов < > smirale...@gmail.com> > >> > wrote: > >> > > >> > > Hi Qingsheng! My name is Alexander, I'm not a committer yet, but I'd > >> > > really like to become one. And this FLIP really interested me. > >> > > Actually I have worked on a similar feature in my company’s Flink > >> > > fork, and we would like to share our thoughts on this and make code > >> > > open source. > >> > > > >> > > I think there is a better alternative than introducing an abstract > >> > > class for TableFunction (CachingTableFunction). As you know, > >> > > TableFunction exists in the flink-table-common module, which > provides > >> > > only an API for working with tables – it’s very convenient for > importing > >> > > in connectors. In turn, CachingTableFunction contains logic for > >> > > runtime execution, so this class and everything connected with it > >> > > should be located in another module, probably in > flink-table-runtime. > >> > > But this will require connectors to depend on another module, which > >> > > contains a lot of runtime logic, which doesn’t sound good. > >> > > > >> > > I suggest adding a new method ‘getLookupConfig’ to LookupTableSource > >> > > or LookupRuntimeProvider to allow connectors to only pass > >> > > configurations to the planner, therefore they won’t depend on > runtime > >> > > realization. Based on these configs planner will construct a lookup > >> > > join operator with corresponding runtime logic (ProcessFunctions in > >> > > module flink-table-runtime). Architecture looks like in the pinned > >> > > image (LookupConfig class there is actually yours CacheConfig). > >> > > > >> > > Classes in flink-table-planner, that will be responsible for this – > >> > > CommonPhysicalLookupJoin and his inheritors. > >> > > Current classes for lookup join in flink-table-runtime - > >> > > LookupJoinRunner, AsyncLookupJoinRunner, LookupJoinRunnerWithCalc, > >> > > AsyncLookupJoinRunnerWithCalc. > >> > > > >> > > I suggest adding classes LookupJoinCachingRunner, > >> > > LookupJoinCachingRunnerWithCalc, etc. > >> > > > >> > > And here comes another more powerful advantage of such a solution. > If > >> > > we have caching logic on a lower level, we can apply some > >> > > optimizations to it. LookupJoinRunnerWithCalc was named like this > >> > > because it uses the ‘calc’ function, which actually mostly consists > of > >> > > filters and projections. > >> > > > >> > > For example, in join table A with lookup table B condition ‘JOIN … > ON > >> > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’ ‘calc’ > >> > > function will contain filters A.age = B.age + 10 and B.salary > > 1000. > >> > > > >> > > If we apply this function before storing records in cache, size of > >> > > cache will be significantly reduced: filters = avoid storing useless > >> > > records in cache, projections = reduce records’ size. So the initial > >> > > max number of records in cache can be increased by the user. > >> > > > >> > > What do you think about it? > >> > > > >> > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote: > >> > > > Hi devs, > >> > > > > >> > > > Yuan and I would like to start a discussion about FLIP-221[1], > which > >> > > introduces an abstraction of lookup table cache and its standard > metrics. > >> > > > > >> > > > Currently each lookup table source should implement their own > cache to > >> > > store lookup results, and there isn’t a standard of metrics for > users and > >> > > developers to tuning their jobs with lookup joins, which is a quite > common > >> > > use case in Flink table / SQL. > >> > > > > >> > > > Therefore we propose some new APIs including cache, metrics, > wrapper > >> > > classes of TableFunction and new table options. Please take a look > at the > >> > > FLIP page [1] to get more details. Any suggestions and comments > would be > >> > > appreciated! > >> > > > > >> > > > [1] > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >> > > > > >> > > > Best regards, > >> > > > > >> > > > Qingsheng > >> > > > > >> > > > > >> > > > > > > > > > > -- > > Best Regards, > > > > Qingsheng Ren > > > > Real-time Computing Team > > Alibaba Cloud > > > > Email: renqs...@gmail.com >