Hi Yuan, Thanks for the review! Basically I’m with Alexander opinion. We’d like to limit the scope in lookup scenario so we didn’t extend the cache to a generic one. And as for the metric I think the existing metric definitions are also applicable for all-cache case.
Cheers, Qingsheng > On May 15, 2022, at 21:17, zst...@163.com wrote: > > Hi Qingsheng and devs, > > Thanks for your heated discussion and redesign to optmize this feature. I > just have two comments: > 1. How about abtract the LookupCache to a higher level with a common Cache? > It will be convenient for devs to use in other place. > > 2. Does it have any metrics, such as NumCachedRecords for the AllCache? > Best regards, > Yuan > > At 2022-05-13 20:27:44, "Qingsheng Ren" <renqs...@gmail.com> wrote: > >Hi Alexander and devs, > > > >Thank you very much for the in-depth discussion! As Jark mentioned we were > >inspired by Alexander's idea and made a refactor on our design. FLIP-221 > >[1] has been updated to reflect our design now and we are happy to hear > >more suggestions from you! > > > >Compared to the previous design: > >1. The lookup cache serves at table runtime level and is integrated as a > >component of LookupJoinRunner as discussed previously. > >2. Interfaces are renamed and re-designed to reflect the new design. > >3. We separate the all-caching case individually and introduce a new > >RescanRuntimeProvider to reuse the ability of scanning. We are planning to > >support SourceFunction / InputFormat for now considering the complexity of > >FLIP-27 Source API. > >4. A new interface LookupFunction is introduced to make the semantic of > >lookup more straightforward for developers. > > > >For replying to Alexander: > >> However I'm a little confused whether InputFormat is deprecated or not. > >Am I right that it will be so in the future, but currently it's not? > >Yes you are right. InputFormat is not deprecated for now. I think it will > >be deprecated in the future but we don't have a clear plan for that. > > > >Thanks again for the discussion on this FLIP and looking forward to > >cooperating with you after we finalize the design and interfaces! > > > >[1] > >https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > >Best regards, > > > >Qingsheng > > > > > >On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <smirale...@gmail.com> > >wrote: > > > >> Hi Jark, Qingsheng and Leonard! > >> > >> Glad to see that we came to a consensus on almost all points! > >> > >> However I'm a little confused whether InputFormat is deprecated or > >> not. Am I right that it will be so in the future, but currently it's > >> not? Actually I also think that for the first version it's OK to use > >> InputFormat in ALL cache realization, because supporting rescan > >> ability seems like a very distant prospect. But for this decision we > >> need a consensus among all discussion participants. > >> > >> In general, I don't have something to argue with your statements. All > >> of them correspond my ideas. Looking ahead, it would be nice to work > >> on this FLIP cooperatively. I've already done a lot of work on lookup > >> join caching with realization very close to the one we are discussing, > >> and want to share the results of this work. Anyway looking forward for > >> the FLIP update! > >> > >> Best regards, > >> Smirnov Alexander > >> > >> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>: > >> > > >> > Hi Alex, > >> > > >> > Thanks for summarizing your points. > >> > > >> > In the past week, Qingsheng, Leonard, and I have discussed it several > >> times > >> > and we have totally refactored the design. > >> > I'm glad to say we have reached a consensus on many of your points! > >> > Qingsheng is still working on updating the design docs and maybe can be > >> > available in the next few days. > >> > I will share some conclusions from our discussions: > >> > > >> > 1) we have refactored the design towards to "cache in framework" way. > >> > > >> > 2) a "LookupCache" interface for users to customize and a default > >> > implementation with builder for users to easy-use. > >> > This can both make it possible to both have flexibility and conciseness. > >> > > >> > 3) Filter pushdown is important for ALL and LRU lookup cache, esp > >> reducing > >> > IO. > >> > Filter pushdown should be the final state and the unified way to both > >> > support pruning ALL cache and LRU cache, > >> > so I think we should make effort in this direction. If we need to support > >> > filter pushdown for ALL cache anyway, why not use > >> > it for LRU cache as well? Either way, as we decide to implement the cache > >> > in the framework, we have the chance to support > >> > filter on cache anytime. This is an optimization and it doesn't affect > >> the > >> > public API. I think we can create a JIRA issue to > >> > discuss it when the FLIP is accepted. > >> > > >> > 4) The idea to support ALL cache is similar to your proposal. > >> > In the first version, we will only support InputFormat, SourceFunction > >> for > >> > cache all (invoke InputFormat in join operator). > >> > For FLIP-27 source, we need to join a true source operator instead of > >> > calling it embedded in the join operator. > >> > However, this needs another FLIP to support the re-scan ability for > >> FLIP-27 > >> > Source, and this can be a large work. > >> > In order to not block this issue, we can put the effort of FLIP-27 source > >> > integration into future work and integrate > >> > InputFormat&SourceFunction for now. > >> > > >> > I think it's fine to use InputFormat&SourceFunction, as they are not > >> > deprecated, otherwise, we have to introduce another function > >> > similar to them which is meaningless. We need to plan FLIP-27 source > >> > integration ASAP before InputFormat & SourceFunction are deprecated. > >> > > >> > Best, > >> > Jark > >> > > >> > > >> > > >> > > >> > > >> > On Thu, 12 May 2022 at 15:46, Александр Смирнов <smirale...@gmail.com> > >> > wrote: > >> > > >> > > Hi Martijn! > >> > > > >> > > Got it. Therefore, the realization with InputFormat is not considered. > >> > > Thanks for clearing that up! > >> > > > >> > > Best regards, > >> > > Smirnov Alexander > >> > > > >> > > чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com>: > >> > > > > >> > > > Hi, > >> > > > > >> > > > With regards to: > >> > > > > >> > > > > But if there are plans to refactor all connectors to FLIP-27 > >> > > > > >> > > > Yes, FLIP-27 is the target for all connectors. The old interfaces > >> will be > >> > > > deprecated and connectors will either be refactored to use the new > >> ones > >> > > or > >> > > > dropped. > >> > > > > >> > > > The caching should work for connectors that are using FLIP-27 > >> interfaces, > >> > > > we should not introduce new features for old interfaces. > >> > > > > >> > > > Best regards, > >> > > > > >> > > > Martijn > >> > > > > >> > > > On Thu, 12 May 2022 at 06:19, Александр Смирнов < > >> smirale...@gmail.com> > >> > > > wrote: > >> > > > > >> > > > > Hi Jark! > >> > > > > > >> > > > > Sorry for the late response. I would like to make some comments and > >> > > > > clarify my points. > >> > > > > > >> > > > > 1) I agree with your first statement. I think we can achieve both > >> > > > > advantages this way: put the Cache interface in flink-table-common, > >> > > > > but have implementations of it in flink-table-runtime. Therefore > >> if a > >> > > > > connector developer wants to use existing cache strategies and > >> their > >> > > > > implementations, he can just pass lookupConfig to the planner, but > >> if > >> > > > > he wants to have its own cache implementation in his > >> TableFunction, it > >> > > > > will be possible for him to use the existing interface for this > >> > > > > purpose (we can explicitly point this out in the documentation). In > >> > > > > this way all configs and metrics will be unified. WDYT? > >> > > > > > >> > > > > > If a filter can prune 90% of data in the cache, we will have 90% > >> of > >> > > > > lookup requests that can never be cached > >> > > > > > >> > > > > 2) Let me clarify the logic filters optimization in case of LRU > >> cache. > >> > > > > It looks like Cache<RowData, Collection<RowData>>. Here we always > >> > > > > store the response of the dimension table in cache, even after > >> > > > > applying calc function. I.e. if there are no rows after applying > >> > > > > filters to the result of the 'eval' method of TableFunction, we > >> store > >> > > > > the empty list by lookup keys. Therefore the cache line will be > >> > > > > filled, but will require much less memory (in bytes). I.e. we don't > >> > > > > completely filter keys, by which result was pruned, but > >> significantly > >> > > > > reduce required memory to store this result. If the user knows > >> about > >> > > > > this behavior, he can increase the 'max-rows' option before the > >> start > >> > > > > of the job. But actually I came up with the idea that we can do > >> this > >> > > > > automatically by using the 'maximumWeight' and 'weigher' methods of > >> > > > > GuavaCache [1]. Weight can be the size of the collection of rows > >> > > > > (value of cache). Therefore cache can automatically fit much more > >> > > > > records than before. > >> > > > > > >> > > > > > Flink SQL has provided a standard way to do filters and projects > >> > > > > pushdown, i.e., SupportsFilterPushDown and > >> SupportsProjectionPushDown. > >> > > > > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean > >> it's > >> > > hard > >> > > > > to implement. > >> > > > > > >> > > > > It's debatable how difficult it will be to implement filter > >> pushdown. > >> > > > > But I think the fact that currently there is no database connector > >> > > > > with filter pushdown at least means that this feature won't be > >> > > > > supported soon in connectors. Moreover, if we talk about other > >> > > > > connectors (not in Flink repo), their databases might not support > >> all > >> > > > > Flink filters (or not support filters at all). I think users are > >> > > > > interested in supporting cache filters optimization independently > >> of > >> > > > > supporting other features and solving more complex problems (or > >> > > > > unsolvable at all). > >> > > > > > >> > > > > 3) I agree with your third statement. Actually in our internal > >> version > >> > > > > I also tried to unify the logic of scanning and reloading data from > >> > > > > connectors. But unfortunately, I didn't find a way to unify the > >> logic > >> > > > > of all ScanRuntimeProviders (InputFormat, SourceFunction, > >> Source,...) > >> > > > > and reuse it in reloading ALL cache. As a result I settled on using > >> > > > > InputFormat, because it was used for scanning in all lookup > >> > > > > connectors. (I didn't know that there are plans to deprecate > >> > > > > InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27 > >> source > >> > > > > in ALL caching is not good idea, because this source was designed > >> to > >> > > > > work in distributed environment (SplitEnumerator on JobManager and > >> > > > > SourceReaders on TaskManagers), not in one operator (lookup join > >> > > > > operator in our case). There is even no direct way to pass splits > >> from > >> > > > > SplitEnumerator to SourceReader (this logic works through > >> > > > > SplitEnumeratorContext, which requires > >> > > > > OperatorCoordinator.SubtaskGateway to send AddSplitEvents). Usage > >> of > >> > > > > InputFormat for ALL cache seems much more clearer and easier. But > >> if > >> > > > > there are plans to refactor all connectors to FLIP-27, I have the > >> > > > > following ideas: maybe we can refuse from lookup join ALL cache in > >> > > > > favor of simple join with multiple scanning of batch source? The > >> point > >> > > > > is that the only difference between lookup join ALL cache and > >> simple > >> > > > > join with batch source is that in the first case scanning is > >> performed > >> > > > > multiple times, in between which state (cache) is cleared (correct > >> me > >> > > > > if I'm wrong). So what if we extend the functionality of simple > >> join > >> > > > > to support state reloading + extend the functionality of scanning > >> > > > > batch source multiple times (this one should be easy with new > >> FLIP-27 > >> > > > > source, that unifies streaming/batch reading - we will need to > >> change > >> > > > > only SplitEnumerator, which will pass splits again after some TTL). > >> > > > > WDYT? I must say that this looks like a long-term goal and will > >> make > >> > > > > the scope of this FLIP even larger than you said. Maybe we can > >> limit > >> > > > > ourselves to a simpler solution now (InputFormats). > >> > > > > > >> > > > > So to sum up, my points is like this: > >> > > > > 1) There is a way to make both concise and flexible interfaces for > >> > > > > caching in lookup join. > >> > > > > 2) Cache filters optimization is important both in LRU and ALL > >> caches. > >> > > > > 3) It is unclear when filter pushdown will be supported in Flink > >> > > > > connectors, some of the connectors might not have the opportunity > >> to > >> > > > > support filter pushdown + as I know, currently filter pushdown > >> works > >> > > > > only for scanning (not lookup). So cache filters + projections > >> > > > > optimization should be independent from other features. > >> > > > > 4) ALL cache realization is a complex topic that involves multiple > >> > > > > aspects of how Flink is developing. Refusing from InputFormat in > >> favor > >> > > > > of FLIP-27 Source will make ALL cache realization really complex > >> and > >> > > > > not clear, so maybe instead of that we can extend the > >> functionality of > >> > > > > simple join or not refuse from InputFormat in case of lookup join > >> ALL > >> > > > > cache? > >> > > > > > >> > > > > Best regards, > >> > > > > Smirnov Alexander > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > [1] > >> > > > > > >> > > > >> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) > >> > > > > > >> > > > > чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>: > >> > > > > > > >> > > > > > It's great to see the active discussion! I want to share my > >> ideas: > >> > > > > > > >> > > > > > 1) implement the cache in framework vs. connectors base > >> > > > > > I don't have a strong opinion on this. Both ways should work > >> (e.g., > >> > > cache > >> > > > > > pruning, compatibility). > >> > > > > > The framework way can provide more concise interfaces. > >> > > > > > The connector base way can define more flexible cache > >> > > > > > strategies/implementations. > >> > > > > > We are still investigating a way to see if we can have both > >> > > advantages. > >> > > > > > We should reach a consensus that the way should be a final state, > >> > > and we > >> > > > > > are on the path to it. > >> > > > > > > >> > > > > > 2) filters and projections pushdown: > >> > > > > > I agree with Alex that the filter pushdown into cache can > >> benefit a > >> > > lot > >> > > > > for > >> > > > > > ALL cache. > >> > > > > > However, this is not true for LRU cache. Connectors use cache to > >> > > reduce > >> > > > > IO > >> > > > > > requests to databases for better throughput. > >> > > > > > If a filter can prune 90% of data in the cache, we will have 90% > >> of > >> > > > > lookup > >> > > > > > requests that can never be cached > >> > > > > > and hit directly to the databases. That means the cache is > >> > > meaningless in > >> > > > > > this case. > >> > > > > > > >> > > > > > IMO, Flink SQL has provided a standard way to do filters and > >> projects > >> > > > > > pushdown, i.e., SupportsFilterPushDown and > >> > > SupportsProjectionPushDown. > >> > > > > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean > >> it's > >> > > hard > >> > > > > to > >> > > > > > implement. > >> > > > > > They should implement the pushdown interfaces to reduce IO and > >> the > >> > > cache > >> > > > > > size. > >> > > > > > That should be a final state that the scan source and lookup > >> source > >> > > share > >> > > > > > the exact pushdown implementation. > >> > > > > > I don't see why we need to duplicate the pushdown logic in > >> caches, > >> > > which > >> > > > > > will complex the lookup join design. > >> > > > > > > >> > > > > > 3) ALL cache abstraction > >> > > > > > All cache might be the most challenging part of this FLIP. We > >> have > >> > > never > >> > > > > > provided a reload-lookup public interface. > >> > > > > > Currently, we put the reload logic in the "eval" method of > >> > > TableFunction. > >> > > > > > That's hard for some sources (e.g., Hive). > >> > > > > > Ideally, connector implementation should share the logic of > >> reload > >> > > and > >> > > > > > scan, i.e. ScanTableSource with > >> InputFormat/SourceFunction/FLIP-27 > >> > > > > Source. > >> > > > > > However, InputFormat/SourceFunction are deprecated, and the > >> FLIP-27 > >> > > > > source > >> > > > > > is deeply coupled with SourceOperator. > >> > > > > > If we want to invoke the FLIP-27 source in LookupJoin, this may > >> make > >> > > the > >> > > > > > scope of this FLIP much larger. > >> > > > > > We are still investigating how to abstract the ALL cache logic > >> and > >> > > reuse > >> > > > > > the existing source interfaces. > >> > > > > > > >> > > > > > > >> > > > > > Best, > >> > > > > > Jark > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com> > >> > > wrote: > >> > > > > > > >> > > > > > > It's a much more complicated activity and lies out of the > >> scope of > >> > > this > >> > > > > > > improvement. Because such pushdowns should be done for all > >> > > > > ScanTableSource > >> > > > > > > implementations (not only for Lookup ones). > >> > > > > > > > >> > > > > > > On Thu, 5 May 2022 at 19:02, Martijn Visser < > >> > > martijnvis...@apache.org> > >> > > > > > > wrote: > >> > > > > > > > >> > > > > > >> Hi everyone, > >> > > > > > >> > >> > > > > > >> One question regarding "And Alexander correctly mentioned that > >> > > filter > >> > > > > > >> pushdown still is not implemented for jdbc/hive/hbase." -> > >> Would > >> > > an > >> > > > > > >> alternative solution be to actually implement these filter > >> > > pushdowns? > >> > > > > I > >> > > > > > >> can > >> > > > > > >> imagine that there are many more benefits to doing that, > >> outside > >> > > of > >> > > > > lookup > >> > > > > > >> caching and metrics. > >> > > > > > >> > >> > > > > > >> Best regards, > >> > > > > > >> > >> > > > > > >> Martijn Visser > >> > > > > > >> https://twitter.com/MartijnVisser82 > >> > > > > > >> https://github.com/MartijnVisser > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> On Thu, 5 May 2022 at 13:58, Roman Boyko < > >> ro.v.bo...@gmail.com> > >> > > > > wrote: > >> > > > > > >> > >> > > > > > >> > Hi everyone! > >> > > > > > >> > > >> > > > > > >> > Thanks for driving such a valuable improvement! > >> > > > > > >> > > >> > > > > > >> > I do think that single cache implementation would be a nice > >> > > > > opportunity > >> > > > > > >> for > >> > > > > > >> > users. And it will break the "FOR SYSTEM_TIME AS OF > >> proc_time" > >> > > > > semantics > >> > > > > > >> > anyway - doesn't matter how it will be implemented. > >> > > > > > >> > > >> > > > > > >> > Putting myself in the user's shoes, I can say that: > >> > > > > > >> > 1) I would prefer to have the opportunity to cut off the > >> cache > >> > > size > >> > > > > by > >> > > > > > >> > simply filtering unnecessary data. And the most handy way > >> to do > >> > > it > >> > > > > is > >> > > > > > >> apply > >> > > > > > >> > it inside LookupRunners. It would be a bit harder to pass it > >> > > > > through the > >> > > > > > >> > LookupJoin node to TableFunction. And Alexander correctly > >> > > mentioned > >> > > > > that > >> > > > > > >> > filter pushdown still is not implemented for > >> jdbc/hive/hbase. > >> > > > > > >> > 2) The ability to set the different caching parameters for > >> > > different > >> > > > > > >> tables > >> > > > > > >> > is quite important. So I would prefer to set it through DDL > >> > > rather > >> > > > > than > >> > > > > > >> > have similar ttla, strategy and other options for all lookup > >> > > tables. > >> > > > > > >> > 3) Providing the cache into the framework really deprives > >> us of > >> > > > > > >> > extensibility (users won't be able to implement their own > >> > > cache). > >> > > > > But > >> > > > > > >> most > >> > > > > > >> > probably it might be solved by creating more different cache > >> > > > > strategies > >> > > > > > >> and > >> > > > > > >> > a wider set of configurations. > >> > > > > > >> > > >> > > > > > >> > All these points are much closer to the schema proposed by > >> > > > > Alexander. > >> > > > > > >> > Qingshen Ren, please correct me if I'm not right and all > >> these > >> > > > > > >> facilities > >> > > > > > >> > might be simply implemented in your architecture? > >> > > > > > >> > > >> > > > > > >> > Best regards, > >> > > > > > >> > Roman Boyko > >> > > > > > >> > e.: ro.v.bo...@gmail.com > >> > > > > > >> > > >> > > > > > >> > On Wed, 4 May 2022 at 21:01, Martijn Visser < > >> > > > > martijnvis...@apache.org> > >> > > > > > >> > wrote: > >> > > > > > >> > > >> > > > > > >> > > Hi everyone, > >> > > > > > >> > > > >> > > > > > >> > > I don't have much to chip in, but just wanted to express > >> that > >> > > I > >> > > > > really > >> > > > > > >> > > appreciate the in-depth discussion on this topic and I > >> hope > >> > > that > >> > > > > > >> others > >> > > > > > >> > > will join the conversation. > >> > > > > > >> > > > >> > > > > > >> > > Best regards, > >> > > > > > >> > > > >> > > > > > >> > > Martijn > >> > > > > > >> > > > >> > > > > > >> > > On Tue, 3 May 2022 at 10:15, Александр Смирнов < > >> > > > > smirale...@gmail.com> > >> > > > > > >> > > wrote: > >> > > > > > >> > > > >> > > > > > >> > > > Hi Qingsheng, Leonard and Jark, > >> > > > > > >> > > > > >> > > > > > >> > > > Thanks for your detailed feedback! However, I have > >> questions > >> > > > > about > >> > > > > > >> > > > some of your statements (maybe I didn't get something?). > >> > > > > > >> > > > > >> > > > > > >> > > > > Caching actually breaks the semantic of "FOR > >> SYSTEM_TIME > >> > > AS OF > >> > > > > > >> > > proc_time” > >> > > > > > >> > > > > >> > > > > > >> > > > I agree that the semantics of "FOR SYSTEM_TIME AS OF > >> > > proc_time" > >> > > > > is > >> > > > > > >> not > >> > > > > > >> > > > fully implemented with caching, but as you said, users > >> go > >> > > on it > >> > > > > > >> > > > consciously to achieve better performance (no one > >> proposed > >> > > to > >> > > > > enable > >> > > > > > >> > > > caching by default, etc.). Or by users do you mean other > >> > > > > developers > >> > > > > > >> of > >> > > > > > >> > > > connectors? In this case developers explicitly specify > >> > > whether > >> > > > > their > >> > > > > > >> > > > connector supports caching or not (in the list of > >> supported > >> > > > > > >> options), > >> > > > > > >> > > > no one makes them do that if they don't want to. So what > >> > > > > exactly is > >> > > > > > >> > > > the difference between implementing caching in modules > >> > > > > > >> > > > flink-table-runtime and in flink-table-common from the > >> > > > > considered > >> > > > > > >> > > > point of view? How does it affect on > >> breaking/non-breaking > >> > > the > >> > > > > > >> > > > semantics of "FOR SYSTEM_TIME AS OF proc_time"? > >> > > > > > >> > > > > >> > > > > > >> > > > > confront a situation that allows table options in DDL > >> to > >> > > > > control > >> > > > > > >> the > >> > > > > > >> > > > behavior of the framework, which has never happened > >> > > previously > >> > > > > and > >> > > > > > >> > should > >> > > > > > >> > > > be cautious > >> > > > > > >> > > > > >> > > > > > >> > > > If we talk about main differences of semantics of DDL > >> > > options > >> > > > > and > >> > > > > > >> > > > config options("table.exec.xxx"), isn't it about > >> limiting > >> > > the > >> > > > > scope > >> > > > > > >> of > >> > > > > > >> > > > the options + importance for the user business logic > >> rather > >> > > than > >> > > > > > >> > > > specific location of corresponding logic in the > >> framework? I > >> > > > > mean > >> > > > > > >> that > >> > > > > > >> > > > in my design, for example, putting an option with lookup > >> > > cache > >> > > > > > >> > > > strategy in configurations would be the wrong decision, > >> > > > > because it > >> > > > > > >> > > > directly affects the user's business logic (not just > >> > > performance > >> > > > > > >> > > > optimization) + touches just several functions of ONE > >> table > >> > > > > (there > >> > > > > > >> can > >> > > > > > >> > > > be multiple tables with different caches). Does it > >> really > >> > > > > matter for > >> > > > > > >> > > > the user (or someone else) where the logic is located, > >> > > which is > >> > > > > > >> > > > affected by the applied option? > >> > > > > > >> > > > Also I can remember DDL option 'sink.parallelism', > >> which in > >> > > > > some way > >> > > > > > >> > > > "controls the behavior of the framework" and I don't > >> see any > >> > > > > problem > >> > > > > > >> > > > here. > >> > > > > > >> > > > > >> > > > > > >> > > > > introduce a new interface for this all-caching > >> scenario > >> > > and > >> > > > > the > >> > > > > > >> > design > >> > > > > > >> > > > would become more complex > >> > > > > > >> > > > > >> > > > > > >> > > > This is a subject for a separate discussion, but > >> actually > >> > > in our > >> > > > > > >> > > > internal version we solved this problem quite easily - > >> we > >> > > reused > >> > > > > > >> > > > InputFormat class (so there is no need for a new API). > >> The > >> > > > > point is > >> > > > > > >> > > > that currently all lookup connectors use InputFormat for > >> > > > > scanning > >> > > > > > >> the > >> > > > > > >> > > > data in batch mode: HBase, JDBC and even Hive - it uses > >> > > class > >> > > > > > >> > > > PartitionReader, that is actually just a wrapper around > >> > > > > InputFormat. > >> > > > > > >> > > > The advantage of this solution is the ability to reload > >> > > cache > >> > > > > data > >> > > > > > >> in > >> > > > > > >> > > > parallel (number of threads depends on number of > >> > > InputSplits, > >> > > > > but > >> > > > > > >> has > >> > > > > > >> > > > an upper limit). As a result cache reload time > >> significantly > >> > > > > reduces > >> > > > > > >> > > > (as well as time of input stream blocking). I know that > >> > > usually > >> > > > > we > >> > > > > > >> try > >> > > > > > >> > > > to avoid usage of concurrency in Flink code, but maybe > >> this > >> > > one > >> > > > > can > >> > > > > > >> be > >> > > > > > >> > > > an exception. BTW I don't say that it's an ideal > >> solution, > >> > > maybe > >> > > > > > >> there > >> > > > > > >> > > > are better ones. > >> > > > > > >> > > > > >> > > > > > >> > > > > Providing the cache in the framework might introduce > >> > > > > compatibility > >> > > > > > >> > > issues > >> > > > > > >> > > > > >> > > > > > >> > > > It's possible only in cases when the developer of the > >> > > connector > >> > > > > > >> won't > >> > > > > > >> > > > properly refactor his code and will use new cache > >> options > >> > > > > > >> incorrectly > >> > > > > > >> > > > (i.e. explicitly provide the same options into 2 > >> different > >> > > code > >> > > > > > >> > > > places). For correct behavior all he will need to do is > >> to > >> > > > > redirect > >> > > > > > >> > > > existing options to the framework's LookupConfig (+ > >> maybe > >> > > add an > >> > > > > > >> alias > >> > > > > > >> > > > for options, if there was different naming), everything > >> > > will be > >> > > > > > >> > > > transparent for users. If the developer won't do > >> > > refactoring at > >> > > > > all, > >> > > > > > >> > > > nothing will be changed for the connector because of > >> > > backward > >> > > > > > >> > > > compatibility. Also if a developer wants to use his own > >> > > cache > >> > > > > logic, > >> > > > > > >> > > > he just can refuse to pass some of the configs into the > >> > > > > framework, > >> > > > > > >> and > >> > > > > > >> > > > instead make his own implementation with already > >> existing > >> > > > > configs > >> > > > > > >> and > >> > > > > > >> > > > metrics (but actually I think that it's a rare case). > >> > > > > > >> > > > > >> > > > > > >> > > > > filters and projections should be pushed all the way > >> down > >> > > to > >> > > > > the > >> > > > > > >> > table > >> > > > > > >> > > > function, like what we do in the scan source > >> > > > > > >> > > > > >> > > > > > >> > > > It's the great purpose. But the truth is that the ONLY > >> > > connector > >> > > > > > >> that > >> > > > > > >> > > > supports filter pushdown is FileSystemTableSource > >> > > > > > >> > > > (no database connector supports it currently). Also for > >> some > >> > > > > > >> databases > >> > > > > > >> > > > it's simply impossible to pushdown such complex filters > >> > > that we > >> > > > > have > >> > > > > > >> > > > in Flink. > >> > > > > > >> > > > > >> > > > > > >> > > > > only applying these optimizations to the cache seems > >> not > >> > > > > quite > >> > > > > > >> > useful > >> > > > > > >> > > > > >> > > > > > >> > > > Filters can cut off an arbitrarily large amount of data > >> > > from the > >> > > > > > >> > > > dimension table. For a simple example, suppose in > >> dimension > >> > > > > table > >> > > > > > >> > > > 'users' > >> > > > > > >> > > > we have column 'age' with values from 20 to 40, and > >> input > >> > > stream > >> > > > > > >> > > > 'clicks' that is ~uniformly distributed by age of > >> users. If > >> > > we > >> > > > > have > >> > > > > > >> > > > filter 'age > 30', > >> > > > > > >> > > > there will be twice less data in cache. This means the > >> user > >> > > can > >> > > > > > >> > > > increase 'lookup.cache.max-rows' by almost 2 times. It > >> will > >> > > > > gain a > >> > > > > > >> > > > huge > >> > > > > > >> > > > performance boost. Moreover, this optimization starts to > >> > > really > >> > > > > > >> shine > >> > > > > > >> > > > in 'ALL' cache, where tables without filters and > >> projections > >> > > > > can't > >> > > > > > >> fit > >> > > > > > >> > > > in memory, but with them - can. This opens up additional > >> > > > > > >> possibilities > >> > > > > > >> > > > for users. And this doesn't sound as 'not quite useful'. > >> > > > > > >> > > > > >> > > > > > >> > > > It would be great to hear other voices regarding this > >> topic! > >> > > > > Because > >> > > > > > >> > > > we have quite a lot of controversial points, and I think > >> > > with > >> > > > > the > >> > > > > > >> help > >> > > > > > >> > > > of others it will be easier for us to come to a > >> consensus. > >> > > > > > >> > > > > >> > > > > > >> > > > Best regards, > >> > > > > > >> > > > Smirnov Alexander > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren < > >> > > renqs...@gmail.com > >> > > > > >: > >> > > > > > >> > > > > > >> > > > > > >> > > > > Hi Alexander and Arvid, > >> > > > > > >> > > > > > >> > > > > > >> > > > > Thanks for the discussion and sorry for my late > >> response! > >> > > We > >> > > > > had > >> > > > > > >> an > >> > > > > > >> > > > internal discussion together with Jark and Leonard and > >> I’d > >> > > like > >> > > > > to > >> > > > > > >> > > > summarize our ideas. Instead of implementing the cache > >> > > logic in > >> > > > > the > >> > > > > > >> > table > >> > > > > > >> > > > runtime layer or wrapping around the user-provided table > >> > > > > function, > >> > > > > > >> we > >> > > > > > >> > > > prefer to introduce some new APIs extending > >> TableFunction > >> > > with > >> > > > > these > >> > > > > > >> > > > concerns: > >> > > > > > >> > > > > > >> > > > > > >> > > > > 1. Caching actually breaks the semantic of "FOR > >> > > SYSTEM_TIME > >> > > > > AS OF > >> > > > > > >> > > > proc_time”, because it couldn’t truly reflect the > >> content > >> > > of the > >> > > > > > >> lookup > >> > > > > > >> > > > table at the moment of querying. If users choose to > >> enable > >> > > > > caching > >> > > > > > >> on > >> > > > > > >> > the > >> > > > > > >> > > > lookup table, they implicitly indicate that this > >> breakage is > >> > > > > > >> acceptable > >> > > > > > >> > > in > >> > > > > > >> > > > exchange for the performance. So we prefer not to > >> provide > >> > > > > caching on > >> > > > > > >> > the > >> > > > > > >> > > > table runtime level. > >> > > > > > >> > > > > > >> > > > > > >> > > > > 2. If we make the cache implementation in the > >> framework > >> > > > > (whether > >> > > > > > >> in a > >> > > > > > >> > > > runner or a wrapper around TableFunction), we have to > >> > > confront a > >> > > > > > >> > > situation > >> > > > > > >> > > > that allows table options in DDL to control the > >> behavior of > >> > > the > >> > > > > > >> > > framework, > >> > > > > > >> > > > which has never happened previously and should be > >> cautious. > >> > > > > Under > >> > > > > > >> the > >> > > > > > >> > > > current design the behavior of the framework should > >> only be > >> > > > > > >> specified > >> > > > > > >> > by > >> > > > > > >> > > > configurations (“table.exec.xxx”), and it’s hard to > >> apply > >> > > these > >> > > > > > >> general > >> > > > > > >> > > > configs to a specific table. > >> > > > > > >> > > > > > >> > > > > > >> > > > > 3. We have use cases that lookup source loads and > >> refresh > >> > > all > >> > > > > > >> records > >> > > > > > >> > > > periodically into the memory to achieve high lookup > >> > > performance > >> > > > > > >> (like > >> > > > > > >> > > Hive > >> > > > > > >> > > > connector in the community, and also widely used by our > >> > > internal > >> > > > > > >> > > > connectors). Wrapping the cache around the user’s > >> > > TableFunction > >> > > > > > >> works > >> > > > > > >> > > fine > >> > > > > > >> > > > for LRU caches, but I think we have to introduce a new > >> > > > > interface for > >> > > > > > >> > this > >> > > > > > >> > > > all-caching scenario and the design would become more > >> > > complex. > >> > > > > > >> > > > > > >> > > > > > >> > > > > 4. Providing the cache in the framework might > >> introduce > >> > > > > > >> compatibility > >> > > > > > >> > > > issues to existing lookup sources like there might > >> exist two > >> > > > > caches > >> > > > > > >> > with > >> > > > > > >> > > > totally different strategies if the user incorrectly > >> > > configures > >> > > > > the > >> > > > > > >> > table > >> > > > > > >> > > > (one in the framework and another implemented by the > >> lookup > >> > > > > source). > >> > > > > > >> > > > > > >> > > > > > >> > > > > As for the optimization mentioned by Alexander, I > >> think > >> > > > > filters > >> > > > > > >> and > >> > > > > > >> > > > projections should be pushed all the way down to the > >> table > >> > > > > function, > >> > > > > > >> > like > >> > > > > > >> > > > what we do in the scan source, instead of the runner > >> with > >> > > the > >> > > > > cache. > >> > > > > > >> > The > >> > > > > > >> > > > goal of using cache is to reduce the network I/O and > >> > > pressure > >> > > > > on the > >> > > > > > >> > > > external system, and only applying these optimizations > >> to > >> > > the > >> > > > > cache > >> > > > > > >> > seems > >> > > > > > >> > > > not quite useful. > >> > > > > > >> > > > > > >> > > > > > >> > > > > I made some updates to the FLIP[1] to reflect our > >> ideas. > >> > > We > >> > > > > > >> prefer to > >> > > > > > >> > > > keep the cache implementation as a part of > >> TableFunction, > >> > > and we > >> > > > > > >> could > >> > > > > > >> > > > provide some helper classes (CachingTableFunction, > >> > > > > > >> > > AllCachingTableFunction, > >> > > > > > >> > > > CachingAsyncTableFunction) to developers and regulate > >> > > metrics > >> > > > > of the > >> > > > > > >> > > cache. > >> > > > > > >> > > > Also, I made a POC[2] for your reference. > >> > > > > > >> > > > > > >> > > > > > >> > > > > Looking forward to your ideas! > >> > > > > > >> > > > > > >> > > > > > >> > > > > [1] > >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> > > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >> > > > > > >> > > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221 > >> > > > > > >> > > > > > >> > > > > > >> > > > > Best regards, > >> > > > > > >> > > > > > >> > > > > > >> > > > > Qingsheng > >> > > > > > >> > > > > > >> > > > > > >> > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов < > >> > > > > > >> > > smirale...@gmail.com> > >> > > > > > >> > > > wrote: > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> Thanks for the response, Arvid! > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> I have few comments on your message. > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > but could also live with an easier solution as the > >> > > first > >> > > > > step: > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> I think that these 2 ways are mutually exclusive > >> > > (originally > >> > > > > > >> > proposed > >> > > > > > >> > > > >> by Qingsheng and mine), because conceptually they > >> follow > >> > > the > >> > > > > same > >> > > > > > >> > > > >> goal, but implementation details are different. If we > >> > > will > >> > > > > go one > >> > > > > > >> > way, > >> > > > > > >> > > > >> moving to another way in the future will mean > >> deleting > >> > > > > existing > >> > > > > > >> code > >> > > > > > >> > > > >> and once again changing the API for connectors. So I > >> > > think we > >> > > > > > >> should > >> > > > > > >> > > > >> reach a consensus with the community about that and > >> then > >> > > work > >> > > > > > >> > together > >> > > > > > >> > > > >> on this FLIP, i.e. divide the work on tasks for > >> different > >> > > > > parts > >> > > > > > >> of > >> > > > > > >> > the > >> > > > > > >> > > > >> flip (for example, LRU cache unification / > >> introducing > >> > > > > proposed > >> > > > > > >> set > >> > > > > > >> > of > >> > > > > > >> > > > >> metrics / further work…). WDYT, Qingsheng? > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > as the source will only receive the requests after > >> > > filter > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> Actually if filters are applied to fields of the > >> lookup > >> > > > > table, we > >> > > > > > >> > > > >> firstly must do requests, and only after that we can > >> > > filter > >> > > > > > >> > responses, > >> > > > > > >> > > > >> because lookup connectors don't have filter > >> pushdown. So > >> > > if > >> > > > > > >> > filtering > >> > > > > > >> > > > >> is done before caching, there will be much less rows > >> in > >> > > > > cache. > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > @Alexander unfortunately, your architecture is not > >> > > shared. > >> > > > > I > >> > > > > > >> don't > >> > > > > > >> > > > know the > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > solution to share images to be honest. > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> Sorry for that, I’m a bit new to such kinds of > >> > > conversations > >> > > > > :) > >> > > > > > >> > > > >> I have no write access to the confluence, so I made a > >> > > Jira > >> > > > > issue, > >> > > > > > >> > > > >> where described the proposed changes in more details > >> - > >> > > > > > >> > > > >> https://issues.apache.org/jira/browse/FLINK-27411. > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> Will happy to get more feedback! > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> Best, > >> > > > > > >> > > > >> Smirnov Alexander > >> > > > > > >> > > > >> > >> > > > > > >> > > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise < > >> > > ar...@apache.org>: > >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > Hi Qingsheng, > >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > Thanks for driving this; the inconsistency was not > >> > > > > satisfying > >> > > > > > >> for > >> > > > > > >> > > me. > >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > I second Alexander's idea though but could also > >> live > >> > > with > >> > > > > an > >> > > > > > >> > easier > >> > > > > > >> > > > >> > solution as the first step: Instead of making > >> caching > >> > > an > >> > > > > > >> > > > implementation > >> > > > > > >> > > > >> > detail of TableFunction X, rather devise a caching > >> > > layer > >> > > > > > >> around X. > >> > > > > > >> > > So > >> > > > > > >> > > > the > >> > > > > > >> > > > >> > proposal would be a CachingTableFunction that > >> > > delegates to > >> > > > > X in > >> > > > > > >> > case > >> > > > > > >> > > > of > >> > > > > > >> > > > >> > misses and else manages the cache. Lifting it into > >> the > >> > > > > operator > >> > > > > > >> > > model > >> > > > > > >> > > > as > >> > > > > > >> > > > >> > proposed would be even better but is probably > >> > > unnecessary > >> > > > > in > >> > > > > > >> the > >> > > > > > >> > > > first step > >> > > > > > >> > > > >> > for a lookup source (as the source will only > >> receive > >> > > the > >> > > > > > >> requests > >> > > > > > >> > > > after > >> > > > > > >> > > > >> > filter; applying projection may be more > >> interesting to > >> > > save > >> > > > > > >> > memory). > >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > Another advantage is that all the changes of this > >> FLIP > >> > > > > would be > >> > > > > > >> > > > limited to > >> > > > > > >> > > > >> > options, no need for new public interfaces. > >> Everything > >> > > else > >> > > > > > >> > remains > >> > > > > > >> > > an > >> > > > > > >> > > > >> > implementation of Table runtime. That means we can > >> > > easily > >> > > > > > >> > > incorporate > >> > > > > > >> > > > the > >> > > > > > >> > > > >> > optimization potential that Alexander pointed out > >> > > later. > >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > @Alexander unfortunately, your architecture is not > >> > > shared. > >> > > > > I > >> > > > > > >> don't > >> > > > > > >> > > > know the > >> > > > > > >> > > > >> > solution to share images to be honest. > >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов < > >> > > > > > >> > > > smirale...@gmail.com> > >> > > > > > >> > > > >> > wrote: > >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > Hi Qingsheng! My name is Alexander, I'm not a > >> > > committer > >> > > > > yet, > >> > > > > > >> but > >> > > > > > >> > > I'd > >> > > > > > >> > > > >> > > really like to become one. And this FLIP really > >> > > > > interested > >> > > > > > >> me. > >> > > > > > >> > > > >> > > Actually I have worked on a similar feature in my > >> > > > > company’s > >> > > > > > >> > Flink > >> > > > > > >> > > > >> > > fork, and we would like to share our thoughts on > >> > > this and > >> > > > > > >> make > >> > > > > > >> > > code > >> > > > > > >> > > > >> > > open source. > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > I think there is a better alternative than > >> > > introducing an > >> > > > > > >> > abstract > >> > > > > > >> > > > >> > > class for TableFunction (CachingTableFunction). > >> As > >> > > you > >> > > > > know, > >> > > > > > >> > > > >> > > TableFunction exists in the flink-table-common > >> > > module, > >> > > > > which > >> > > > > > >> > > > provides > >> > > > > > >> > > > >> > > only an API for working with tables – it’s very > >> > > > > convenient > >> > > > > > >> for > >> > > > > > >> > > > importing > >> > > > > > >> > > > >> > > in connectors. In turn, CachingTableFunction > >> contains > >> > > > > logic > >> > > > > > >> for > >> > > > > > >> > > > >> > > runtime execution, so this class and everything > >> > > > > connected > >> > > > > > >> with > >> > > > > > >> > it > >> > > > > > >> > > > >> > > should be located in another module, probably in > >> > > > > > >> > > > flink-table-runtime. > >> > > > > > >> > > > >> > > But this will require connectors to depend on > >> another > >> > > > > module, > >> > > > > > >> > > which > >> > > > > > >> > > > >> > > contains a lot of runtime logic, which doesn’t > >> sound > >> > > > > good. > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > I suggest adding a new method ‘getLookupConfig’ > >> to > >> > > > > > >> > > LookupTableSource > >> > > > > > >> > > > >> > > or LookupRuntimeProvider to allow connectors to > >> only > >> > > pass > >> > > > > > >> > > > >> > > configurations to the planner, therefore they > >> won’t > >> > > > > depend on > >> > > > > > >> > > > runtime > >> > > > > > >> > > > >> > > realization. Based on these configs planner will > >> > > > > construct a > >> > > > > > >> > > lookup > >> > > > > > >> > > > >> > > join operator with corresponding runtime logic > >> > > > > > >> (ProcessFunctions > >> > > > > > >> > > in > >> > > > > > >> > > > >> > > module flink-table-runtime). Architecture looks > >> like > >> > > in > >> > > > > the > >> > > > > > >> > pinned > >> > > > > > >> > > > >> > > image (LookupConfig class there is actually yours > >> > > > > > >> CacheConfig). > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > Classes in flink-table-planner, that will be > >> > > responsible > >> > > > > for > >> > > > > > >> > this > >> > > > > > >> > > – > >> > > > > > >> > > > >> > > CommonPhysicalLookupJoin and his inheritors. > >> > > > > > >> > > > >> > > Current classes for lookup join in > >> > > flink-table-runtime > >> > > > > - > >> > > > > > >> > > > >> > > LookupJoinRunner, AsyncLookupJoinRunner, > >> > > > > > >> > LookupJoinRunnerWithCalc, > >> > > > > > >> > > > >> > > AsyncLookupJoinRunnerWithCalc. > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > I suggest adding classes LookupJoinCachingRunner, > >> > > > > > >> > > > >> > > LookupJoinCachingRunnerWithCalc, etc. > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > And here comes another more powerful advantage of > >> > > such a > >> > > > > > >> > solution. > >> > > > > > >> > > > If > >> > > > > > >> > > > >> > > we have caching logic on a lower level, we can > >> apply > >> > > some > >> > > > > > >> > > > >> > > optimizations to it. LookupJoinRunnerWithCalc was > >> > > named > >> > > > > like > >> > > > > > >> > this > >> > > > > > >> > > > >> > > because it uses the ‘calc’ function, which > >> actually > >> > > > > mostly > >> > > > > > >> > > consists > >> > > > > > >> > > > of > >> > > > > > >> > > > >> > > filters and projections. > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > For example, in join table A with lookup table B > >> > > > > condition > >> > > > > > >> > ‘JOIN … > >> > > > > > >> > > > ON > >> > > > > > >> > > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE > >> B.salary > > >> > > 1000’ > >> > > > > > >> > ‘calc’ > >> > > > > > >> > > > >> > > function will contain filters A.age = B.age + 10 > >> and > >> > > > > > >> B.salary > > >> > > > > > >> > > > 1000. > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > If we apply this function before storing records > >> in > >> > > > > cache, > >> > > > > > >> size > >> > > > > > >> > of > >> > > > > > >> > > > >> > > cache will be significantly reduced: filters = > >> avoid > >> > > > > storing > >> > > > > > >> > > useless > >> > > > > > >> > > > >> > > records in cache, projections = reduce records’ > >> > > size. So > >> > > > > the > >> > > > > > >> > > initial > >> > > > > > >> > > > >> > > max number of records in cache can be increased > >> by > >> > > the > >> > > > > user. > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > What do you think about it? > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote: > >> > > > > > >> > > > >> > > > Hi devs, > >> > > > > > >> > > > >> > > > > >> > > > > > >> > > > >> > > > Yuan and I would like to start a discussion > >> about > >> > > > > > >> FLIP-221[1], > >> > > > > > >> > > > which > >> > > > > > >> > > > >> > > introduces an abstraction of lookup table cache > >> and > >> > > its > >> > > > > > >> standard > >> > > > > > >> > > > metrics. > >> > > > > > >> > > > >> > > > > >> > > > > > >> > > > >> > > > Currently each lookup table source should > >> implement > >> > > > > their > >> > > > > > >> own > >> > > > > > >> > > > cache to > >> > > > > > >> > > > >> > > store lookup results, and there isn’t a standard > >> of > >> > > > > metrics > >> > > > > > >> for > >> > > > > > >> > > > users and > >> > > > > > >> > > > >> > > developers to tuning their jobs with lookup > >> joins, > >> > > which > >> > > > > is a > >> > > > > > >> > > quite > >> > > > > > >> > > > common > >> > > > > > >> > > > >> > > use case in Flink table / SQL. > >> > > > > > >> > > > >> > > > > >> > > > > > >> > > > >> > > > Therefore we propose some new APIs including > >> cache, > >> > > > > > >> metrics, > >> > > > > > >> > > > wrapper > >> > > > > > >> > > > >> > > classes of TableFunction and new table options. > >> > > Please > >> > > > > take a > >> > > > > > >> > look > >> > > > > > >> > > > at the > >> > > > > > >> > > > >> > > FLIP page [1] to get more details. Any > >> suggestions > >> > > and > >> > > > > > >> comments > >> > > > > > >> > > > would be > >> > > > > > >> > > > >> > > appreciated! > >> > > > > > >> > > > >> > > > > >> > > > > > >> > > > >> > > > [1] > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> > > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >> > > > > > >> > > > >> > > > > >> > > > > > >> > > > >> > > > Best regards, > >> > > > > > >> > > > >> > > > > >> > > > > > >> > > > >> > > > Qingsheng > >> > > > > > >> > > > >> > > > > >> > > > > > >> > > > >> > > > > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > > >> > > > > Best Regards, > >> > > > > > >> > > > > > >> > > > > > >> > > > > Qingsheng Ren > >> > > > > > >> > > > > > >> > > > > > >> > > > > Real-time Computing Team > >> > > > > > >> > > > > Alibaba Cloud > >> > > > > > >> > > > > > >> > > > > > >> > > > > Email: renqs...@gmail.com > >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > > > >> > > > > > > > >> > > > > > > -- > >> > > > > > > Best regards, > >> > > > > > > Roman Boyko > >> > > > > > > e.: ro.v.bo...@gmail.com > >> > > > > > > > >> > > > > > >> > > > >> > > > >> > >> > > > >-- > >Best Regards, > > > >*Qingsheng Ren* > > > >Real-time Computing Team > >Alibaba Cloud > > > >Email: renqs...@gmail.com >