Hi Qingsheng, Alexander,
Thanks for your reply. > Can you give an example of such upper - level Cache usage? It's not clear for > me currently. I think it's unnecessary to have such high level abstraction, > if nowhere in the code we won't operate with objects as instances of Cache. > But maybe there are other opinions on this. I have't find any other usage yet. Maybe it can be used in DataStream API. If you all think it's unnecessary, we can ignore it. > I think there won't be many problems with supporting metrics in ALL cache. > Moreover, some of proposed metrics are most useful especially in ALL case, > for example, 'latestLoadTimeGauge' or 'numCachedRecords', so necessary > metrics definitely should be supported in this cache strategy. Sorry for my mistake. There is no problem with it. Best regards, Yuan At 2022-05-17 17:15:20, "Qingsheng Ren" <renqs...@gmail.com> wrote: >Hi Yuan, > >Thanks for the review! Basically I’m with Alexander opinion. We’d like to >limit the scope in lookup scenario so we didn’t extend the cache to a generic >one. And as for the metric I think the existing metric definitions are also >applicable for all-cache case. > >Cheers, > >Qingsheng > > >> On May 15, 2022, at 21:17, zst...@163.com wrote: >> >> Hi Qingsheng and devs, >> >> Thanks for your heated discussion and redesign to optmize this feature. I >> just have two comments: >> 1. How about abtract the LookupCache to a higher level with a common Cache? >> It will be convenient for devs to use in other place. >> >> 2. Does it have any metrics, such as NumCachedRecords for the AllCache? >> Best regards, >> Yuan >> >> At 2022-05-13 20:27:44, "Qingsheng Ren" <renqs...@gmail.com> wrote: >> >Hi Alexander and devs, >> > >> >Thank you very much for the in-depth discussion! As Jark mentioned we were >> >inspired by Alexander's idea and made a refactor on our design. FLIP-221 >> >[1] has been updated to reflect our design now and we are happy to hear >> >more suggestions from you! >> > >> >Compared to the previous design: >> >1. The lookup cache serves at table runtime level and is integrated as a >> >component of LookupJoinRunner as discussed previously. >> >2. Interfaces are renamed and re-designed to reflect the new design. >> >3. We separate the all-caching case individually and introduce a new >> >RescanRuntimeProvider to reuse the ability of scanning. We are planning to >> >support SourceFunction / InputFormat for now considering the complexity of >> >FLIP-27 Source API. >> >4. A new interface LookupFunction is introduced to make the semantic of >> >lookup more straightforward for developers. >> > >> >For replying to Alexander: >> >> However I'm a little confused whether InputFormat is deprecated or not. >> >Am I right that it will be so in the future, but currently it's not? >> >Yes you are right. InputFormat is not deprecated for now. I think it will >> >be deprecated in the future but we don't have a clear plan for that. >> > >> >Thanks again for the discussion on this FLIP and looking forward to >> >cooperating with you after we finalize the design and interfaces! >> > >> >[1] >> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> > >> >Best regards, >> > >> >Qingsheng >> > >> > >> >On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <smirale...@gmail.com> >> >wrote: >> > >> >> Hi Jark, Qingsheng and Leonard! >> >> >> >> Glad to see that we came to a consensus on almost all points! >> >> >> >> However I'm a little confused whether InputFormat is deprecated or >> >> not. Am I right that it will be so in the future, but currently it's >> >> not? Actually I also think that for the first version it's OK to use >> >> InputFormat in ALL cache realization, because supporting rescan >> >> ability seems like a very distant prospect. But for this decision we >> >> need a consensus among all discussion participants. >> >> >> >> In general, I don't have something to argue with your statements. All >> >> of them correspond my ideas. Looking ahead, it would be nice to work >> >> on this FLIP cooperatively. I've already done a lot of work on lookup >> >> join caching with realization very close to the one we are discussing, >> >> and want to share the results of this work. Anyway looking forward for >> >> the FLIP update! >> >> >> >> Best regards, >> >> Smirnov Alexander >> >> >> >> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>: >> >> > >> >> > Hi Alex, >> >> > >> >> > Thanks for summarizing your points. >> >> > >> >> > In the past week, Qingsheng, Leonard, and I have discussed it several >> >> times >> >> > and we have totally refactored the design. >> >> > I'm glad to say we have reached a consensus on many of your points! >> >> > Qingsheng is still working on updating the design docs and maybe can be >> >> > available in the next few days. >> >> > I will share some conclusions from our discussions: >> >> > >> >> > 1) we have refactored the design towards to "cache in framework" way. >> >> > >> >> > 2) a "LookupCache" interface for users to customize and a default >> >> > implementation with builder for users to easy-use. >> >> > This can both make it possible to both have flexibility and conciseness. >> >> > >> >> > 3) Filter pushdown is important for ALL and LRU lookup cache, esp >> >> reducing >> >> > IO. >> >> > Filter pushdown should be the final state and the unified way to both >> >> > support pruning ALL cache and LRU cache, >> >> > so I think we should make effort in this direction. If we need to >> >> > support >> >> > filter pushdown for ALL cache anyway, why not use >> >> > it for LRU cache as well? Either way, as we decide to implement the >> >> > cache >> >> > in the framework, we have the chance to support >> >> > filter on cache anytime. This is an optimization and it doesn't affect >> >> the >> >> > public API. I think we can create a JIRA issue to >> >> > discuss it when the FLIP is accepted. >> >> > >> >> > 4) The idea to support ALL cache is similar to your proposal. >> >> > In the first version, we will only support InputFormat, SourceFunction >> >> for >> >> > cache all (invoke InputFormat in join operator). >> >> > For FLIP-27 source, we need to join a true source operator instead of >> >> > calling it embedded in the join operator. >> >> > However, this needs another FLIP to support the re-scan ability for >> >> FLIP-27 >> >> > Source, and this can be a large work. >> >> > In order to not block this issue, we can put the effort of FLIP-27 >> >> > source >> >> > integration into future work and integrate >> >> > InputFormat&SourceFunction for now. >> >> > >> >> > I think it's fine to use InputFormat&SourceFunction, as they are not >> >> > deprecated, otherwise, we have to introduce another function >> >> > similar to them which is meaningless. We need to plan FLIP-27 source >> >> > integration ASAP before InputFormat & SourceFunction are deprecated. >> >> > >> >> > Best, >> >> > Jark >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > On Thu, 12 May 2022 at 15:46, Александр Смирнов <smirale...@gmail.com> >> >> > wrote: >> >> > >> >> > > Hi Martijn! >> >> > > >> >> > > Got it. Therefore, the realization with InputFormat is not considered. >> >> > > Thanks for clearing that up! >> >> > > >> >> > > Best regards, >> >> > > Smirnov Alexander >> >> > > >> >> > > чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com>: >> >> > > > >> >> > > > Hi, >> >> > > > >> >> > > > With regards to: >> >> > > > >> >> > > > > But if there are plans to refactor all connectors to FLIP-27 >> >> > > > >> >> > > > Yes, FLIP-27 is the target for all connectors. The old interfaces >> >> will be >> >> > > > deprecated and connectors will either be refactored to use the new >> >> ones >> >> > > or >> >> > > > dropped. >> >> > > > >> >> > > > The caching should work for connectors that are using FLIP-27 >> >> interfaces, >> >> > > > we should not introduce new features for old interfaces. >> >> > > > >> >> > > > Best regards, >> >> > > > >> >> > > > Martijn >> >> > > > >> >> > > > On Thu, 12 May 2022 at 06:19, Александр Смирнов < >> >> smirale...@gmail.com> >> >> > > > wrote: >> >> > > > >> >> > > > > Hi Jark! >> >> > > > > >> >> > > > > Sorry for the late response. I would like to make some comments >> >> > > > > and >> >> > > > > clarify my points. >> >> > > > > >> >> > > > > 1) I agree with your first statement. I think we can achieve both >> >> > > > > advantages this way: put the Cache interface in >> >> > > > > flink-table-common, >> >> > > > > but have implementations of it in flink-table-runtime. Therefore >> >> if a >> >> > > > > connector developer wants to use existing cache strategies and >> >> their >> >> > > > > implementations, he can just pass lookupConfig to the planner, but >> >> if >> >> > > > > he wants to have its own cache implementation in his >> >> TableFunction, it >> >> > > > > will be possible for him to use the existing interface for this >> >> > > > > purpose (we can explicitly point this out in the documentation). >> >> > > > > In >> >> > > > > this way all configs and metrics will be unified. WDYT? >> >> > > > > >> >> > > > > > If a filter can prune 90% of data in the cache, we will have 90% >> >> of >> >> > > > > lookup requests that can never be cached >> >> > > > > >> >> > > > > 2) Let me clarify the logic filters optimization in case of LRU >> >> cache. >> >> > > > > It looks like Cache<RowData, Collection<RowData>>. Here we always >> >> > > > > store the response of the dimension table in cache, even after >> >> > > > > applying calc function. I.e. if there are no rows after applying >> >> > > > > filters to the result of the 'eval' method of TableFunction, we >> >> store >> >> > > > > the empty list by lookup keys. Therefore the cache line will be >> >> > > > > filled, but will require much less memory (in bytes). I.e. we >> >> > > > > don't >> >> > > > > completely filter keys, by which result was pruned, but >> >> significantly >> >> > > > > reduce required memory to store this result. If the user knows >> >> about >> >> > > > > this behavior, he can increase the 'max-rows' option before the >> >> start >> >> > > > > of the job. But actually I came up with the idea that we can do >> >> this >> >> > > > > automatically by using the 'maximumWeight' and 'weigher' methods >> >> > > > > of >> >> > > > > GuavaCache [1]. Weight can be the size of the collection of rows >> >> > > > > (value of cache). Therefore cache can automatically fit much more >> >> > > > > records than before. >> >> > > > > >> >> > > > > > Flink SQL has provided a standard way to do filters and projects >> >> > > > > pushdown, i.e., SupportsFilterPushDown and >> >> SupportsProjectionPushDown. >> >> > > > > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean >> >> it's >> >> > > hard >> >> > > > > to implement. >> >> > > > > >> >> > > > > It's debatable how difficult it will be to implement filter >> >> pushdown. >> >> > > > > But I think the fact that currently there is no database connector >> >> > > > > with filter pushdown at least means that this feature won't be >> >> > > > > supported soon in connectors. Moreover, if we talk about other >> >> > > > > connectors (not in Flink repo), their databases might not support >> >> all >> >> > > > > Flink filters (or not support filters at all). I think users are >> >> > > > > interested in supporting cache filters optimization independently >> >> of >> >> > > > > supporting other features and solving more complex problems (or >> >> > > > > unsolvable at all). >> >> > > > > >> >> > > > > 3) I agree with your third statement. Actually in our internal >> >> version >> >> > > > > I also tried to unify the logic of scanning and reloading data >> >> > > > > from >> >> > > > > connectors. But unfortunately, I didn't find a way to unify the >> >> logic >> >> > > > > of all ScanRuntimeProviders (InputFormat, SourceFunction, >> >> Source,...) >> >> > > > > and reuse it in reloading ALL cache. As a result I settled on >> >> > > > > using >> >> > > > > InputFormat, because it was used for scanning in all lookup >> >> > > > > connectors. (I didn't know that there are plans to deprecate >> >> > > > > InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27 >> >> source >> >> > > > > in ALL caching is not good idea, because this source was designed >> >> to >> >> > > > > work in distributed environment (SplitEnumerator on JobManager and >> >> > > > > SourceReaders on TaskManagers), not in one operator (lookup join >> >> > > > > operator in our case). There is even no direct way to pass splits >> >> from >> >> > > > > SplitEnumerator to SourceReader (this logic works through >> >> > > > > SplitEnumeratorContext, which requires >> >> > > > > OperatorCoordinator.SubtaskGateway to send AddSplitEvents). Usage >> >> of >> >> > > > > InputFormat for ALL cache seems much more clearer and easier. But >> >> if >> >> > > > > there are plans to refactor all connectors to FLIP-27, I have the >> >> > > > > following ideas: maybe we can refuse from lookup join ALL cache in >> >> > > > > favor of simple join with multiple scanning of batch source? The >> >> point >> >> > > > > is that the only difference between lookup join ALL cache and >> >> simple >> >> > > > > join with batch source is that in the first case scanning is >> >> performed >> >> > > > > multiple times, in between which state (cache) is cleared (correct >> >> me >> >> > > > > if I'm wrong). So what if we extend the functionality of simple >> >> join >> >> > > > > to support state reloading + extend the functionality of scanning >> >> > > > > batch source multiple times (this one should be easy with new >> >> FLIP-27 >> >> > > > > source, that unifies streaming/batch reading - we will need to >> >> change >> >> > > > > only SplitEnumerator, which will pass splits again after some >> >> > > > > TTL). >> >> > > > > WDYT? I must say that this looks like a long-term goal and will >> >> make >> >> > > > > the scope of this FLIP even larger than you said. Maybe we can >> >> limit >> >> > > > > ourselves to a simpler solution now (InputFormats). >> >> > > > > >> >> > > > > So to sum up, my points is like this: >> >> > > > > 1) There is a way to make both concise and flexible interfaces for >> >> > > > > caching in lookup join. >> >> > > > > 2) Cache filters optimization is important both in LRU and ALL >> >> caches. >> >> > > > > 3) It is unclear when filter pushdown will be supported in Flink >> >> > > > > connectors, some of the connectors might not have the opportunity >> >> to >> >> > > > > support filter pushdown + as I know, currently filter pushdown >> >> works >> >> > > > > only for scanning (not lookup). So cache filters + projections >> >> > > > > optimization should be independent from other features. >> >> > > > > 4) ALL cache realization is a complex topic that involves multiple >> >> > > > > aspects of how Flink is developing. Refusing from InputFormat in >> >> favor >> >> > > > > of FLIP-27 Source will make ALL cache realization really complex >> >> and >> >> > > > > not clear, so maybe instead of that we can extend the >> >> functionality of >> >> > > > > simple join or not refuse from InputFormat in case of lookup join >> >> ALL >> >> > > > > cache? >> >> > > > > >> >> > > > > Best regards, >> >> > > > > Smirnov Alexander >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > [1] >> >> > > > > >> >> > > >> >> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) >> >> > > > > >> >> > > > > чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>: >> >> > > > > > >> >> > > > > > It's great to see the active discussion! I want to share my >> >> ideas: >> >> > > > > > >> >> > > > > > 1) implement the cache in framework vs. connectors base >> >> > > > > > I don't have a strong opinion on this. Both ways should work >> >> (e.g., >> >> > > cache >> >> > > > > > pruning, compatibility). >> >> > > > > > The framework way can provide more concise interfaces. >> >> > > > > > The connector base way can define more flexible cache >> >> > > > > > strategies/implementations. >> >> > > > > > We are still investigating a way to see if we can have both >> >> > > advantages. >> >> > > > > > We should reach a consensus that the way should be a final >> >> > > > > > state, >> >> > > and we >> >> > > > > > are on the path to it. >> >> > > > > > >> >> > > > > > 2) filters and projections pushdown: >> >> > > > > > I agree with Alex that the filter pushdown into cache can >> >> benefit a >> >> > > lot >> >> > > > > for >> >> > > > > > ALL cache. >> >> > > > > > However, this is not true for LRU cache. Connectors use cache to >> >> > > reduce >> >> > > > > IO >> >> > > > > > requests to databases for better throughput. >> >> > > > > > If a filter can prune 90% of data in the cache, we will have 90% >> >> of >> >> > > > > lookup >> >> > > > > > requests that can never be cached >> >> > > > > > and hit directly to the databases. That means the cache is >> >> > > meaningless in >> >> > > > > > this case. >> >> > > > > > >> >> > > > > > IMO, Flink SQL has provided a standard way to do filters and >> >> projects >> >> > > > > > pushdown, i.e., SupportsFilterPushDown and >> >> > > SupportsProjectionPushDown. >> >> > > > > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean >> >> it's >> >> > > hard >> >> > > > > to >> >> > > > > > implement. >> >> > > > > > They should implement the pushdown interfaces to reduce IO and >> >> the >> >> > > cache >> >> > > > > > size. >> >> > > > > > That should be a final state that the scan source and lookup >> >> source >> >> > > share >> >> > > > > > the exact pushdown implementation. >> >> > > > > > I don't see why we need to duplicate the pushdown logic in >> >> caches, >> >> > > which >> >> > > > > > will complex the lookup join design. >> >> > > > > > >> >> > > > > > 3) ALL cache abstraction >> >> > > > > > All cache might be the most challenging part of this FLIP. We >> >> have >> >> > > never >> >> > > > > > provided a reload-lookup public interface. >> >> > > > > > Currently, we put the reload logic in the "eval" method of >> >> > > TableFunction. >> >> > > > > > That's hard for some sources (e.g., Hive). >> >> > > > > > Ideally, connector implementation should share the logic of >> >> reload >> >> > > and >> >> > > > > > scan, i.e. ScanTableSource with >> >> InputFormat/SourceFunction/FLIP-27 >> >> > > > > Source. >> >> > > > > > However, InputFormat/SourceFunction are deprecated, and the >> >> FLIP-27 >> >> > > > > source >> >> > > > > > is deeply coupled with SourceOperator. >> >> > > > > > If we want to invoke the FLIP-27 source in LookupJoin, this may >> >> make >> >> > > the >> >> > > > > > scope of this FLIP much larger. >> >> > > > > > We are still investigating how to abstract the ALL cache logic >> >> and >> >> > > reuse >> >> > > > > > the existing source interfaces. >> >> > > > > > >> >> > > > > > >> >> > > > > > Best, >> >> > > > > > Jark >> >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com> >> >> > > wrote: >> >> > > > > > >> >> > > > > > > It's a much more complicated activity and lies out of the >> >> scope of >> >> > > this >> >> > > > > > > improvement. Because such pushdowns should be done for all >> >> > > > > ScanTableSource >> >> > > > > > > implementations (not only for Lookup ones). >> >> > > > > > > >> >> > > > > > > On Thu, 5 May 2022 at 19:02, Martijn Visser < >> >> > > martijnvis...@apache.org> >> >> > > > > > > wrote: >> >> > > > > > > >> >> > > > > > >> Hi everyone, >> >> > > > > > >> >> >> > > > > > >> One question regarding "And Alexander correctly mentioned >> >> > > > > > >> that >> >> > > filter >> >> > > > > > >> pushdown still is not implemented for jdbc/hive/hbase." -> >> >> Would >> >> > > an >> >> > > > > > >> alternative solution be to actually implement these filter >> >> > > pushdowns? >> >> > > > > I >> >> > > > > > >> can >> >> > > > > > >> imagine that there are many more benefits to doing that, >> >> outside >> >> > > of >> >> > > > > lookup >> >> > > > > > >> caching and metrics. >> >> > > > > > >> >> >> > > > > > >> Best regards, >> >> > > > > > >> >> >> > > > > > >> Martijn Visser >> >> > > > > > >> https://twitter.com/MartijnVisser82 >> >> > > > > > >> https://github.com/MartijnVisser >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > > >> On Thu, 5 May 2022 at 13:58, Roman Boyko < >> >> ro.v.bo...@gmail.com> >> >> > > > > wrote: >> >> > > > > > >> >> >> > > > > > >> > Hi everyone! >> >> > > > > > >> > >> >> > > > > > >> > Thanks for driving such a valuable improvement! >> >> > > > > > >> > >> >> > > > > > >> > I do think that single cache implementation would be a nice >> >> > > > > opportunity >> >> > > > > > >> for >> >> > > > > > >> > users. And it will break the "FOR SYSTEM_TIME AS OF >> >> proc_time" >> >> > > > > semantics >> >> > > > > > >> > anyway - doesn't matter how it will be implemented. >> >> > > > > > >> > >> >> > > > > > >> > Putting myself in the user's shoes, I can say that: >> >> > > > > > >> > 1) I would prefer to have the opportunity to cut off the >> >> cache >> >> > > size >> >> > > > > by >> >> > > > > > >> > simply filtering unnecessary data. And the most handy way >> >> to do >> >> > > it >> >> > > > > is >> >> > > > > > >> apply >> >> > > > > > >> > it inside LookupRunners. It would be a bit harder to pass >> >> > > > > > >> > it >> >> > > > > through the >> >> > > > > > >> > LookupJoin node to TableFunction. And Alexander correctly >> >> > > mentioned >> >> > > > > that >> >> > > > > > >> > filter pushdown still is not implemented for >> >> jdbc/hive/hbase. >> >> > > > > > >> > 2) The ability to set the different caching parameters for >> >> > > different >> >> > > > > > >> tables >> >> > > > > > >> > is quite important. So I would prefer to set it through DDL >> >> > > rather >> >> > > > > than >> >> > > > > > >> > have similar ttla, strategy and other options for all >> >> > > > > > >> > lookup >> >> > > tables. >> >> > > > > > >> > 3) Providing the cache into the framework really deprives >> >> us of >> >> > > > > > >> > extensibility (users won't be able to implement their own >> >> > > cache). >> >> > > > > But >> >> > > > > > >> most >> >> > > > > > >> > probably it might be solved by creating more different >> >> > > > > > >> > cache >> >> > > > > strategies >> >> > > > > > >> and >> >> > > > > > >> > a wider set of configurations. >> >> > > > > > >> > >> >> > > > > > >> > All these points are much closer to the schema proposed by >> >> > > > > Alexander. >> >> > > > > > >> > Qingshen Ren, please correct me if I'm not right and all >> >> these >> >> > > > > > >> facilities >> >> > > > > > >> > might be simply implemented in your architecture? >> >> > > > > > >> > >> >> > > > > > >> > Best regards, >> >> > > > > > >> > Roman Boyko >> >> > > > > > >> > e.: ro.v.bo...@gmail.com >> >> > > > > > >> > >> >> > > > > > >> > On Wed, 4 May 2022 at 21:01, Martijn Visser < >> >> > > > > martijnvis...@apache.org> >> >> > > > > > >> > wrote: >> >> > > > > > >> > >> >> > > > > > >> > > Hi everyone, >> >> > > > > > >> > > >> >> > > > > > >> > > I don't have much to chip in, but just wanted to express >> >> that >> >> > > I >> >> > > > > really >> >> > > > > > >> > > appreciate the in-depth discussion on this topic and I >> >> hope >> >> > > that >> >> > > > > > >> others >> >> > > > > > >> > > will join the conversation. >> >> > > > > > >> > > >> >> > > > > > >> > > Best regards, >> >> > > > > > >> > > >> >> > > > > > >> > > Martijn >> >> > > > > > >> > > >> >> > > > > > >> > > On Tue, 3 May 2022 at 10:15, Александр Смирнов < >> >> > > > > smirale...@gmail.com> >> >> > > > > > >> > > wrote: >> >> > > > > > >> > > >> >> > > > > > >> > > > Hi Qingsheng, Leonard and Jark, >> >> > > > > > >> > > > >> >> > > > > > >> > > > Thanks for your detailed feedback! However, I have >> >> questions >> >> > > > > about >> >> > > > > > >> > > > some of your statements (maybe I didn't get >> >> > > > > > >> > > > something?). >> >> > > > > > >> > > > >> >> > > > > > >> > > > > Caching actually breaks the semantic of "FOR >> >> SYSTEM_TIME >> >> > > AS OF >> >> > > > > > >> > > proc_time” >> >> > > > > > >> > > > >> >> > > > > > >> > > > I agree that the semantics of "FOR SYSTEM_TIME AS OF >> >> > > proc_time" >> >> > > > > is >> >> > > > > > >> not >> >> > > > > > >> > > > fully implemented with caching, but as you said, users >> >> go >> >> > > on it >> >> > > > > > >> > > > consciously to achieve better performance (no one >> >> proposed >> >> > > to >> >> > > > > enable >> >> > > > > > >> > > > caching by default, etc.). Or by users do you mean >> >> > > > > > >> > > > other >> >> > > > > developers >> >> > > > > > >> of >> >> > > > > > >> > > > connectors? In this case developers explicitly specify >> >> > > whether >> >> > > > > their >> >> > > > > > >> > > > connector supports caching or not (in the list of >> >> supported >> >> > > > > > >> options), >> >> > > > > > >> > > > no one makes them do that if they don't want to. So >> >> > > > > > >> > > > what >> >> > > > > exactly is >> >> > > > > > >> > > > the difference between implementing caching in modules >> >> > > > > > >> > > > flink-table-runtime and in flink-table-common from the >> >> > > > > considered >> >> > > > > > >> > > > point of view? How does it affect on >> >> breaking/non-breaking >> >> > > the >> >> > > > > > >> > > > semantics of "FOR SYSTEM_TIME AS OF proc_time"? >> >> > > > > > >> > > > >> >> > > > > > >> > > > > confront a situation that allows table options in DDL >> >> to >> >> > > > > control >> >> > > > > > >> the >> >> > > > > > >> > > > behavior of the framework, which has never happened >> >> > > previously >> >> > > > > and >> >> > > > > > >> > should >> >> > > > > > >> > > > be cautious >> >> > > > > > >> > > > >> >> > > > > > >> > > > If we talk about main differences of semantics of DDL >> >> > > options >> >> > > > > and >> >> > > > > > >> > > > config options("table.exec.xxx"), isn't it about >> >> limiting >> >> > > the >> >> > > > > scope >> >> > > > > > >> of >> >> > > > > > >> > > > the options + importance for the user business logic >> >> rather >> >> > > than >> >> > > > > > >> > > > specific location of corresponding logic in the >> >> framework? I >> >> > > > > mean >> >> > > > > > >> that >> >> > > > > > >> > > > in my design, for example, putting an option with >> >> > > > > > >> > > > lookup >> >> > > cache >> >> > > > > > >> > > > strategy in configurations would be the wrong >> >> > > > > > >> > > > decision, >> >> > > > > because it >> >> > > > > > >> > > > directly affects the user's business logic (not just >> >> > > performance >> >> > > > > > >> > > > optimization) + touches just several functions of ONE >> >> table >> >> > > > > (there >> >> > > > > > >> can >> >> > > > > > >> > > > be multiple tables with different caches). Does it >> >> really >> >> > > > > matter for >> >> > > > > > >> > > > the user (or someone else) where the logic is located, >> >> > > which is >> >> > > > > > >> > > > affected by the applied option? >> >> > > > > > >> > > > Also I can remember DDL option 'sink.parallelism', >> >> which in >> >> > > > > some way >> >> > > > > > >> > > > "controls the behavior of the framework" and I don't >> >> see any >> >> > > > > problem >> >> > > > > > >> > > > here. >> >> > > > > > >> > > > >> >> > > > > > >> > > > > introduce a new interface for this all-caching >> >> scenario >> >> > > and >> >> > > > > the >> >> > > > > > >> > design >> >> > > > > > >> > > > would become more complex >> >> > > > > > >> > > > >> >> > > > > > >> > > > This is a subject for a separate discussion, but >> >> actually >> >> > > in our >> >> > > > > > >> > > > internal version we solved this problem quite easily - >> >> we >> >> > > reused >> >> > > > > > >> > > > InputFormat class (so there is no need for a new API). >> >> The >> >> > > > > point is >> >> > > > > > >> > > > that currently all lookup connectors use InputFormat >> >> > > > > > >> > > > for >> >> > > > > scanning >> >> > > > > > >> the >> >> > > > > > >> > > > data in batch mode: HBase, JDBC and even Hive - it uses >> >> > > class >> >> > > > > > >> > > > PartitionReader, that is actually just a wrapper around >> >> > > > > InputFormat. >> >> > > > > > >> > > > The advantage of this solution is the ability to reload >> >> > > cache >> >> > > > > data >> >> > > > > > >> in >> >> > > > > > >> > > > parallel (number of threads depends on number of >> >> > > InputSplits, >> >> > > > > but >> >> > > > > > >> has >> >> > > > > > >> > > > an upper limit). As a result cache reload time >> >> significantly >> >> > > > > reduces >> >> > > > > > >> > > > (as well as time of input stream blocking). I know that >> >> > > usually >> >> > > > > we >> >> > > > > > >> try >> >> > > > > > >> > > > to avoid usage of concurrency in Flink code, but maybe >> >> this >> >> > > one >> >> > > > > can >> >> > > > > > >> be >> >> > > > > > >> > > > an exception. BTW I don't say that it's an ideal >> >> solution, >> >> > > maybe >> >> > > > > > >> there >> >> > > > > > >> > > > are better ones. >> >> > > > > > >> > > > >> >> > > > > > >> > > > > Providing the cache in the framework might introduce >> >> > > > > compatibility >> >> > > > > > >> > > issues >> >> > > > > > >> > > > >> >> > > > > > >> > > > It's possible only in cases when the developer of the >> >> > > connector >> >> > > > > > >> won't >> >> > > > > > >> > > > properly refactor his code and will use new cache >> >> options >> >> > > > > > >> incorrectly >> >> > > > > > >> > > > (i.e. explicitly provide the same options into 2 >> >> different >> >> > > code >> >> > > > > > >> > > > places). For correct behavior all he will need to do is >> >> to >> >> > > > > redirect >> >> > > > > > >> > > > existing options to the framework's LookupConfig (+ >> >> maybe >> >> > > add an >> >> > > > > > >> alias >> >> > > > > > >> > > > for options, if there was different naming), everything >> >> > > will be >> >> > > > > > >> > > > transparent for users. If the developer won't do >> >> > > refactoring at >> >> > > > > all, >> >> > > > > > >> > > > nothing will be changed for the connector because of >> >> > > backward >> >> > > > > > >> > > > compatibility. Also if a developer wants to use his own >> >> > > cache >> >> > > > > logic, >> >> > > > > > >> > > > he just can refuse to pass some of the configs into the >> >> > > > > framework, >> >> > > > > > >> and >> >> > > > > > >> > > > instead make his own implementation with already >> >> existing >> >> > > > > configs >> >> > > > > > >> and >> >> > > > > > >> > > > metrics (but actually I think that it's a rare case). >> >> > > > > > >> > > > >> >> > > > > > >> > > > > filters and projections should be pushed all the way >> >> down >> >> > > to >> >> > > > > the >> >> > > > > > >> > table >> >> > > > > > >> > > > function, like what we do in the scan source >> >> > > > > > >> > > > >> >> > > > > > >> > > > It's the great purpose. But the truth is that the ONLY >> >> > > connector >> >> > > > > > >> that >> >> > > > > > >> > > > supports filter pushdown is FileSystemTableSource >> >> > > > > > >> > > > (no database connector supports it currently). Also for >> >> some >> >> > > > > > >> databases >> >> > > > > > >> > > > it's simply impossible to pushdown such complex filters >> >> > > that we >> >> > > > > have >> >> > > > > > >> > > > in Flink. >> >> > > > > > >> > > > >> >> > > > > > >> > > > > only applying these optimizations to the cache seems >> >> not >> >> > > > > quite >> >> > > > > > >> > useful >> >> > > > > > >> > > > >> >> > > > > > >> > > > Filters can cut off an arbitrarily large amount of data >> >> > > from the >> >> > > > > > >> > > > dimension table. For a simple example, suppose in >> >> dimension >> >> > > > > table >> >> > > > > > >> > > > 'users' >> >> > > > > > >> > > > we have column 'age' with values from 20 to 40, and >> >> input >> >> > > stream >> >> > > > > > >> > > > 'clicks' that is ~uniformly distributed by age of >> >> users. If >> >> > > we >> >> > > > > have >> >> > > > > > >> > > > filter 'age > 30', >> >> > > > > > >> > > > there will be twice less data in cache. This means the >> >> user >> >> > > can >> >> > > > > > >> > > > increase 'lookup.cache.max-rows' by almost 2 times. It >> >> will >> >> > > > > gain a >> >> > > > > > >> > > > huge >> >> > > > > > >> > > > performance boost. Moreover, this optimization starts >> >> > > > > > >> > > > to >> >> > > really >> >> > > > > > >> shine >> >> > > > > > >> > > > in 'ALL' cache, where tables without filters and >> >> projections >> >> > > > > can't >> >> > > > > > >> fit >> >> > > > > > >> > > > in memory, but with them - can. This opens up >> >> > > > > > >> > > > additional >> >> > > > > > >> possibilities >> >> > > > > > >> > > > for users. And this doesn't sound as 'not quite >> >> > > > > > >> > > > useful'. >> >> > > > > > >> > > > >> >> > > > > > >> > > > It would be great to hear other voices regarding this >> >> topic! >> >> > > > > Because >> >> > > > > > >> > > > we have quite a lot of controversial points, and I >> >> > > > > > >> > > > think >> >> > > with >> >> > > > > the >> >> > > > > > >> help >> >> > > > > > >> > > > of others it will be easier for us to come to a >> >> consensus. >> >> > > > > > >> > > > >> >> > > > > > >> > > > Best regards, >> >> > > > > > >> > > > Smirnov Alexander >> >> > > > > > >> > > > >> >> > > > > > >> > > > >> >> > > > > > >> > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren < >> >> > > renqs...@gmail.com >> >> > > > > >: >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > Hi Alexander and Arvid, >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > Thanks for the discussion and sorry for my late >> >> response! >> >> > > We >> >> > > > > had >> >> > > > > > >> an >> >> > > > > > >> > > > internal discussion together with Jark and Leonard and >> >> I’d >> >> > > like >> >> > > > > to >> >> > > > > > >> > > > summarize our ideas. Instead of implementing the cache >> >> > > logic in >> >> > > > > the >> >> > > > > > >> > table >> >> > > > > > >> > > > runtime layer or wrapping around the user-provided >> >> > > > > > >> > > > table >> >> > > > > function, >> >> > > > > > >> we >> >> > > > > > >> > > > prefer to introduce some new APIs extending >> >> TableFunction >> >> > > with >> >> > > > > these >> >> > > > > > >> > > > concerns: >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > 1. Caching actually breaks the semantic of "FOR >> >> > > SYSTEM_TIME >> >> > > > > AS OF >> >> > > > > > >> > > > proc_time”, because it couldn’t truly reflect the >> >> content >> >> > > of the >> >> > > > > > >> lookup >> >> > > > > > >> > > > table at the moment of querying. If users choose to >> >> enable >> >> > > > > caching >> >> > > > > > >> on >> >> > > > > > >> > the >> >> > > > > > >> > > > lookup table, they implicitly indicate that this >> >> breakage is >> >> > > > > > >> acceptable >> >> > > > > > >> > > in >> >> > > > > > >> > > > exchange for the performance. So we prefer not to >> >> provide >> >> > > > > caching on >> >> > > > > > >> > the >> >> > > > > > >> > > > table runtime level. >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > 2. If we make the cache implementation in the >> >> framework >> >> > > > > (whether >> >> > > > > > >> in a >> >> > > > > > >> > > > runner or a wrapper around TableFunction), we have to >> >> > > confront a >> >> > > > > > >> > > situation >> >> > > > > > >> > > > that allows table options in DDL to control the >> >> behavior of >> >> > > the >> >> > > > > > >> > > framework, >> >> > > > > > >> > > > which has never happened previously and should be >> >> cautious. >> >> > > > > Under >> >> > > > > > >> the >> >> > > > > > >> > > > current design the behavior of the framework should >> >> only be >> >> > > > > > >> specified >> >> > > > > > >> > by >> >> > > > > > >> > > > configurations (“table.exec.xxx”), and it’s hard to >> >> apply >> >> > > these >> >> > > > > > >> general >> >> > > > > > >> > > > configs to a specific table. >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > 3. We have use cases that lookup source loads and >> >> refresh >> >> > > all >> >> > > > > > >> records >> >> > > > > > >> > > > periodically into the memory to achieve high lookup >> >> > > performance >> >> > > > > > >> (like >> >> > > > > > >> > > Hive >> >> > > > > > >> > > > connector in the community, and also widely used by our >> >> > > internal >> >> > > > > > >> > > > connectors). Wrapping the cache around the user’s >> >> > > TableFunction >> >> > > > > > >> works >> >> > > > > > >> > > fine >> >> > > > > > >> > > > for LRU caches, but I think we have to introduce a new >> >> > > > > interface for >> >> > > > > > >> > this >> >> > > > > > >> > > > all-caching scenario and the design would become more >> >> > > complex. >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > 4. Providing the cache in the framework might >> >> introduce >> >> > > > > > >> compatibility >> >> > > > > > >> > > > issues to existing lookup sources like there might >> >> exist two >> >> > > > > caches >> >> > > > > > >> > with >> >> > > > > > >> > > > totally different strategies if the user incorrectly >> >> > > configures >> >> > > > > the >> >> > > > > > >> > table >> >> > > > > > >> > > > (one in the framework and another implemented by the >> >> lookup >> >> > > > > source). >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > As for the optimization mentioned by Alexander, I >> >> think >> >> > > > > filters >> >> > > > > > >> and >> >> > > > > > >> > > > projections should be pushed all the way down to the >> >> table >> >> > > > > function, >> >> > > > > > >> > like >> >> > > > > > >> > > > what we do in the scan source, instead of the runner >> >> with >> >> > > the >> >> > > > > cache. >> >> > > > > > >> > The >> >> > > > > > >> > > > goal of using cache is to reduce the network I/O and >> >> > > pressure >> >> > > > > on the >> >> > > > > > >> > > > external system, and only applying these optimizations >> >> to >> >> > > the >> >> > > > > cache >> >> > > > > > >> > seems >> >> > > > > > >> > > > not quite useful. >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > I made some updates to the FLIP[1] to reflect our >> >> ideas. >> >> > > We >> >> > > > > > >> prefer to >> >> > > > > > >> > > > keep the cache implementation as a part of >> >> TableFunction, >> >> > > and we >> >> > > > > > >> could >> >> > > > > > >> > > > provide some helper classes (CachingTableFunction, >> >> > > > > > >> > > AllCachingTableFunction, >> >> > > > > > >> > > > CachingAsyncTableFunction) to developers and regulate >> >> > > metrics >> >> > > > > of the >> >> > > > > > >> > > cache. >> >> > > > > > >> > > > Also, I made a POC[2] for your reference. >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > Looking forward to your ideas! >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > [1] >> >> > > > > > >> > > > >> >> > > > > > >> > > >> >> > > > > > >> > >> >> > > > > > >> >> >> > > > > >> >> > > >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> >> > > > > > >> > > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221 >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > Best regards, >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > Qingsheng >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов < >> >> > > > > > >> > > smirale...@gmail.com> >> >> > > > > > >> > > > wrote: >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> Thanks for the response, Arvid! >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> I have few comments on your message. >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> > but could also live with an easier solution as the >> >> > > first >> >> > > > > step: >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> I think that these 2 ways are mutually exclusive >> >> > > (originally >> >> > > > > > >> > proposed >> >> > > > > > >> > > > >> by Qingsheng and mine), because conceptually they >> >> follow >> >> > > the >> >> > > > > same >> >> > > > > > >> > > > >> goal, but implementation details are different. If >> >> > > > > > >> > > > >> we >> >> > > will >> >> > > > > go one >> >> > > > > > >> > way, >> >> > > > > > >> > > > >> moving to another way in the future will mean >> >> deleting >> >> > > > > existing >> >> > > > > > >> code >> >> > > > > > >> > > > >> and once again changing the API for connectors. So I >> >> > > think we >> >> > > > > > >> should >> >> > > > > > >> > > > >> reach a consensus with the community about that and >> >> then >> >> > > work >> >> > > > > > >> > together >> >> > > > > > >> > > > >> on this FLIP, i.e. divide the work on tasks for >> >> different >> >> > > > > parts >> >> > > > > > >> of >> >> > > > > > >> > the >> >> > > > > > >> > > > >> flip (for example, LRU cache unification / >> >> introducing >> >> > > > > proposed >> >> > > > > > >> set >> >> > > > > > >> > of >> >> > > > > > >> > > > >> metrics / further work…). WDYT, Qingsheng? >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> > as the source will only receive the requests after >> >> > > filter >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> Actually if filters are applied to fields of the >> >> lookup >> >> > > > > table, we >> >> > > > > > >> > > > >> firstly must do requests, and only after that we can >> >> > > filter >> >> > > > > > >> > responses, >> >> > > > > > >> > > > >> because lookup connectors don't have filter >> >> pushdown. So >> >> > > if >> >> > > > > > >> > filtering >> >> > > > > > >> > > > >> is done before caching, there will be much less rows >> >> in >> >> > > > > cache. >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> > @Alexander unfortunately, your architecture is not >> >> > > shared. >> >> > > > > I >> >> > > > > > >> don't >> >> > > > > > >> > > > know the >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> > solution to share images to be honest. >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> Sorry for that, I’m a bit new to such kinds of >> >> > > conversations >> >> > > > > :) >> >> > > > > > >> > > > >> I have no write access to the confluence, so I made >> >> > > > > > >> > > > >> a >> >> > > Jira >> >> > > > > issue, >> >> > > > > > >> > > > >> where described the proposed changes in more details >> >> - >> >> > > > > > >> > > > >> https://issues.apache.org/jira/browse/FLINK-27411. >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> Will happy to get more feedback! >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> Best, >> >> > > > > > >> > > > >> Smirnov Alexander >> >> > > > > > >> > > > >> >> >> > > > > > >> > > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise < >> >> > > ar...@apache.org>: >> >> > > > > > >> > > > >> > >> >> > > > > > >> > > > >> > Hi Qingsheng, >> >> > > > > > >> > > > >> > >> >> > > > > > >> > > > >> > Thanks for driving this; the inconsistency was not >> >> > > > > satisfying >> >> > > > > > >> for >> >> > > > > > >> > > me. >> >> > > > > > >> > > > >> > >> >> > > > > > >> > > > >> > I second Alexander's idea though but could also >> >> live >> >> > > with >> >> > > > > an >> >> > > > > > >> > easier >> >> > > > > > >> > > > >> > solution as the first step: Instead of making >> >> caching >> >> > > an >> >> > > > > > >> > > > implementation >> >> > > > > > >> > > > >> > detail of TableFunction X, rather devise a caching >> >> > > layer >> >> > > > > > >> around X. >> >> > > > > > >> > > So >> >> > > > > > >> > > > the >> >> > > > > > >> > > > >> > proposal would be a CachingTableFunction that >> >> > > delegates to >> >> > > > > X in >> >> > > > > > >> > case >> >> > > > > > >> > > > of >> >> > > > > > >> > > > >> > misses and else manages the cache. Lifting it into >> >> the >> >> > > > > operator >> >> > > > > > >> > > model >> >> > > > > > >> > > > as >> >> > > > > > >> > > > >> > proposed would be even better but is probably >> >> > > unnecessary >> >> > > > > in >> >> > > > > > >> the >> >> > > > > > >> > > > first step >> >> > > > > > >> > > > >> > for a lookup source (as the source will only >> >> receive >> >> > > the >> >> > > > > > >> requests >> >> > > > > > >> > > > after >> >> > > > > > >> > > > >> > filter; applying projection may be more >> >> interesting to >> >> > > save >> >> > > > > > >> > memory). >> >> > > > > > >> > > > >> > >> >> > > > > > >> > > > >> > Another advantage is that all the changes of this >> >> FLIP >> >> > > > > would be >> >> > > > > > >> > > > limited to >> >> > > > > > >> > > > >> > options, no need for new public interfaces. >> >> Everything >> >> > > else >> >> > > > > > >> > remains >> >> > > > > > >> > > an >> >> > > > > > >> > > > >> > implementation of Table runtime. That means we can >> >> > > easily >> >> > > > > > >> > > incorporate >> >> > > > > > >> > > > the >> >> > > > > > >> > > > >> > optimization potential that Alexander pointed out >> >> > > later. >> >> > > > > > >> > > > >> > >> >> > > > > > >> > > > >> > @Alexander unfortunately, your architecture is not >> >> > > shared. >> >> > > > > I >> >> > > > > > >> don't >> >> > > > > > >> > > > know the >> >> > > > > > >> > > > >> > solution to share images to be honest. >> >> > > > > > >> > > > >> > >> >> > > > > > >> > > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов >> >> > > > > > >> > > > >> > < >> >> > > > > > >> > > > smirale...@gmail.com> >> >> > > > > > >> > > > >> > wrote: >> >> > > > > > >> > > > >> > >> >> > > > > > >> > > > >> > > Hi Qingsheng! My name is Alexander, I'm not a >> >> > > committer >> >> > > > > yet, >> >> > > > > > >> but >> >> > > > > > >> > > I'd >> >> > > > > > >> > > > >> > > really like to become one. And this FLIP really >> >> > > > > interested >> >> > > > > > >> me. >> >> > > > > > >> > > > >> > > Actually I have worked on a similar feature in >> >> > > > > > >> > > > >> > > my >> >> > > > > company’s >> >> > > > > > >> > Flink >> >> > > > > > >> > > > >> > > fork, and we would like to share our thoughts on >> >> > > this and >> >> > > > > > >> make >> >> > > > > > >> > > code >> >> > > > > > >> > > > >> > > open source. >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > I think there is a better alternative than >> >> > > introducing an >> >> > > > > > >> > abstract >> >> > > > > > >> > > > >> > > class for TableFunction (CachingTableFunction). >> >> As >> >> > > you >> >> > > > > know, >> >> > > > > > >> > > > >> > > TableFunction exists in the flink-table-common >> >> > > module, >> >> > > > > which >> >> > > > > > >> > > > provides >> >> > > > > > >> > > > >> > > only an API for working with tables – it’s very >> >> > > > > convenient >> >> > > > > > >> for >> >> > > > > > >> > > > importing >> >> > > > > > >> > > > >> > > in connectors. In turn, CachingTableFunction >> >> contains >> >> > > > > logic >> >> > > > > > >> for >> >> > > > > > >> > > > >> > > runtime execution, so this class and everything >> >> > > > > connected >> >> > > > > > >> with >> >> > > > > > >> > it >> >> > > > > > >> > > > >> > > should be located in another module, probably in >> >> > > > > > >> > > > flink-table-runtime. >> >> > > > > > >> > > > >> > > But this will require connectors to depend on >> >> another >> >> > > > > module, >> >> > > > > > >> > > which >> >> > > > > > >> > > > >> > > contains a lot of runtime logic, which doesn’t >> >> sound >> >> > > > > good. >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > I suggest adding a new method ‘getLookupConfig’ >> >> to >> >> > > > > > >> > > LookupTableSource >> >> > > > > > >> > > > >> > > or LookupRuntimeProvider to allow connectors to >> >> only >> >> > > pass >> >> > > > > > >> > > > >> > > configurations to the planner, therefore they >> >> won’t >> >> > > > > depend on >> >> > > > > > >> > > > runtime >> >> > > > > > >> > > > >> > > realization. Based on these configs planner will >> >> > > > > construct a >> >> > > > > > >> > > lookup >> >> > > > > > >> > > > >> > > join operator with corresponding runtime logic >> >> > > > > > >> (ProcessFunctions >> >> > > > > > >> > > in >> >> > > > > > >> > > > >> > > module flink-table-runtime). Architecture looks >> >> like >> >> > > in >> >> > > > > the >> >> > > > > > >> > pinned >> >> > > > > > >> > > > >> > > image (LookupConfig class there is actually >> >> > > > > > >> > > > >> > > yours >> >> > > > > > >> CacheConfig). >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > Classes in flink-table-planner, that will be >> >> > > responsible >> >> > > > > for >> >> > > > > > >> > this >> >> > > > > > >> > > – >> >> > > > > > >> > > > >> > > CommonPhysicalLookupJoin and his inheritors. >> >> > > > > > >> > > > >> > > Current classes for lookup join in >> >> > > flink-table-runtime >> >> > > > > - >> >> > > > > > >> > > > >> > > LookupJoinRunner, AsyncLookupJoinRunner, >> >> > > > > > >> > LookupJoinRunnerWithCalc, >> >> > > > > > >> > > > >> > > AsyncLookupJoinRunnerWithCalc. >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > I suggest adding classes >> >> > > > > > >> > > > >> > > LookupJoinCachingRunner, >> >> > > > > > >> > > > >> > > LookupJoinCachingRunnerWithCalc, etc. >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > And here comes another more powerful advantage >> >> > > > > > >> > > > >> > > of >> >> > > such a >> >> > > > > > >> > solution. >> >> > > > > > >> > > > If >> >> > > > > > >> > > > >> > > we have caching logic on a lower level, we can >> >> apply >> >> > > some >> >> > > > > > >> > > > >> > > optimizations to it. LookupJoinRunnerWithCalc >> >> > > > > > >> > > > >> > > was >> >> > > named >> >> > > > > like >> >> > > > > > >> > this >> >> > > > > > >> > > > >> > > because it uses the ‘calc’ function, which >> >> actually >> >> > > > > mostly >> >> > > > > > >> > > consists >> >> > > > > > >> > > > of >> >> > > > > > >> > > > >> > > filters and projections. >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > For example, in join table A with lookup table B >> >> > > > > condition >> >> > > > > > >> > ‘JOIN … >> >> > > > > > >> > > > ON >> >> > > > > > >> > > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE >> >> B.salary > >> >> > > 1000’ >> >> > > > > > >> > ‘calc’ >> >> > > > > > >> > > > >> > > function will contain filters A.age = B.age + 10 >> >> and >> >> > > > > > >> B.salary > >> >> > > > > > >> > > > 1000. >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > If we apply this function before storing records >> >> in >> >> > > > > cache, >> >> > > > > > >> size >> >> > > > > > >> > of >> >> > > > > > >> > > > >> > > cache will be significantly reduced: filters = >> >> avoid >> >> > > > > storing >> >> > > > > > >> > > useless >> >> > > > > > >> > > > >> > > records in cache, projections = reduce records’ >> >> > > size. So >> >> > > > > the >> >> > > > > > >> > > initial >> >> > > > > > >> > > > >> > > max number of records in cache can be increased >> >> by >> >> > > the >> >> > > > > user. >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > What do you think about it? >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote: >> >> > > > > > >> > > > >> > > > Hi devs, >> >> > > > > > >> > > > >> > > > >> >> > > > > > >> > > > >> > > > Yuan and I would like to start a discussion >> >> about >> >> > > > > > >> FLIP-221[1], >> >> > > > > > >> > > > which >> >> > > > > > >> > > > >> > > introduces an abstraction of lookup table cache >> >> and >> >> > > its >> >> > > > > > >> standard >> >> > > > > > >> > > > metrics. >> >> > > > > > >> > > > >> > > > >> >> > > > > > >> > > > >> > > > Currently each lookup table source should >> >> implement >> >> > > > > their >> >> > > > > > >> own >> >> > > > > > >> > > > cache to >> >> > > > > > >> > > > >> > > store lookup results, and there isn’t a standard >> >> of >> >> > > > > metrics >> >> > > > > > >> for >> >> > > > > > >> > > > users and >> >> > > > > > >> > > > >> > > developers to tuning their jobs with lookup >> >> joins, >> >> > > which >> >> > > > > is a >> >> > > > > > >> > > quite >> >> > > > > > >> > > > common >> >> > > > > > >> > > > >> > > use case in Flink table / SQL. >> >> > > > > > >> > > > >> > > > >> >> > > > > > >> > > > >> > > > Therefore we propose some new APIs including >> >> cache, >> >> > > > > > >> metrics, >> >> > > > > > >> > > > wrapper >> >> > > > > > >> > > > >> > > classes of TableFunction and new table options. >> >> > > Please >> >> > > > > take a >> >> > > > > > >> > look >> >> > > > > > >> > > > at the >> >> > > > > > >> > > > >> > > FLIP page [1] to get more details. Any >> >> suggestions >> >> > > and >> >> > > > > > >> comments >> >> > > > > > >> > > > would be >> >> > > > > > >> > > > >> > > appreciated! >> >> > > > > > >> > > > >> > > > >> >> > > > > > >> > > > >> > > > [1] >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > >> >> > > > > > >> > > >> >> > > > > > >> > >> >> > > > > > >> >> >> > > > > >> >> > > >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> >> > > > > > >> > > > >> > > > >> >> > > > > > >> > > > >> > > > Best regards, >> >> > > > > > >> > > > >> > > > >> >> > > > > > >> > > > >> > > > Qingsheng >> >> > > > > > >> > > > >> > > > >> >> > > > > > >> > > > >> > > > >> >> > > > > > >> > > > >> > > >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > -- >> >> > > > > > >> > > > > Best Regards, >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > Qingsheng Ren >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > Real-time Computing Team >> >> > > > > > >> > > > > Alibaba Cloud >> >> > > > > > >> > > > > >> >> > > > > > >> > > > > Email: renqs...@gmail.com >> >> > > > > > >> > > > >> >> > > > > > >> > > >> >> > > > > > >> > >> >> > > > > > >> >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > -- >> >> > > > > > > Best regards, >> >> > > > > > > Roman Boyko >> >> > > > > > > e.: ro.v.bo...@gmail.com >> >> > > > > > > >> >> > > > > >> >> > > >> >> > > >> >> >> >> >> > >> >-- >> >Best Regards, >> > >> >*Qingsheng Ren* >> > >> >Real-time Computing Team >> >Alibaba Cloud >> > >> >Email: renqs...@gmail.com >>