Hi Qingsheng and devs,
Thanks for your heated discussion and redesign to optmize this feature. I just have two comments: 1. How about abtract the LookupCache to a higher level with a common Cache? It will be convenient for devs to use in other place. 2. Does it have any metrics, such as NumCachedRecords for the AllCache? Best regards, Yuan At 2022-05-13 20:27:44, "Qingsheng Ren" <renqs...@gmail.com> wrote: >Hi Alexander and devs, > >Thank you very much for the in-depth discussion! As Jark mentioned we were >inspired by Alexander's idea and made a refactor on our design. FLIP-221 >[1] has been updated to reflect our design now and we are happy to hear >more suggestions from you! > >Compared to the previous design: >1. The lookup cache serves at table runtime level and is integrated as a >component of LookupJoinRunner as discussed previously. >2. Interfaces are renamed and re-designed to reflect the new design. >3. We separate the all-caching case individually and introduce a new >RescanRuntimeProvider to reuse the ability of scanning. We are planning to >support SourceFunction / InputFormat for now considering the complexity of >FLIP-27 Source API. >4. A new interface LookupFunction is introduced to make the semantic of >lookup more straightforward for developers. > >For replying to Alexander: >> However I'm a little confused whether InputFormat is deprecated or not. >Am I right that it will be so in the future, but currently it's not? >Yes you are right. InputFormat is not deprecated for now. I think it will >be deprecated in the future but we don't have a clear plan for that. > >Thanks again for the discussion on this FLIP and looking forward to >cooperating with you after we finalize the design and interfaces! > >[1] >https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >Best regards, > >Qingsheng > > >On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <smirale...@gmail.com> >wrote: > >> Hi Jark, Qingsheng and Leonard! >> >> Glad to see that we came to a consensus on almost all points! >> >> However I'm a little confused whether InputFormat is deprecated or >> not. Am I right that it will be so in the future, but currently it's >> not? Actually I also think that for the first version it's OK to use >> InputFormat in ALL cache realization, because supporting rescan >> ability seems like a very distant prospect. But for this decision we >> need a consensus among all discussion participants. >> >> In general, I don't have something to argue with your statements. All >> of them correspond my ideas. Looking ahead, it would be nice to work >> on this FLIP cooperatively. I've already done a lot of work on lookup >> join caching with realization very close to the one we are discussing, >> and want to share the results of this work. Anyway looking forward for >> the FLIP update! >> >> Best regards, >> Smirnov Alexander >> >> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>: >> > >> > Hi Alex, >> > >> > Thanks for summarizing your points. >> > >> > In the past week, Qingsheng, Leonard, and I have discussed it several >> times >> > and we have totally refactored the design. >> > I'm glad to say we have reached a consensus on many of your points! >> > Qingsheng is still working on updating the design docs and maybe can be >> > available in the next few days. >> > I will share some conclusions from our discussions: >> > >> > 1) we have refactored the design towards to "cache in framework" way. >> > >> > 2) a "LookupCache" interface for users to customize and a default >> > implementation with builder for users to easy-use. >> > This can both make it possible to both have flexibility and conciseness. >> > >> > 3) Filter pushdown is important for ALL and LRU lookup cache, esp >> reducing >> > IO. >> > Filter pushdown should be the final state and the unified way to both >> > support pruning ALL cache and LRU cache, >> > so I think we should make effort in this direction. If we need to support >> > filter pushdown for ALL cache anyway, why not use >> > it for LRU cache as well? Either way, as we decide to implement the cache >> > in the framework, we have the chance to support >> > filter on cache anytime. This is an optimization and it doesn't affect >> the >> > public API. I think we can create a JIRA issue to >> > discuss it when the FLIP is accepted. >> > >> > 4) The idea to support ALL cache is similar to your proposal. >> > In the first version, we will only support InputFormat, SourceFunction >> for >> > cache all (invoke InputFormat in join operator). >> > For FLIP-27 source, we need to join a true source operator instead of >> > calling it embedded in the join operator. >> > However, this needs another FLIP to support the re-scan ability for >> FLIP-27 >> > Source, and this can be a large work. >> > In order to not block this issue, we can put the effort of FLIP-27 source >> > integration into future work and integrate >> > InputFormat&SourceFunction for now. >> > >> > I think it's fine to use InputFormat&SourceFunction, as they are not >> > deprecated, otherwise, we have to introduce another function >> > similar to them which is meaningless. We need to plan FLIP-27 source >> > integration ASAP before InputFormat & SourceFunction are deprecated. >> > >> > Best, >> > Jark >> > >> > >> > >> > >> > >> > On Thu, 12 May 2022 at 15:46, Александр Смирнов <smirale...@gmail.com> >> > wrote: >> > >> > > Hi Martijn! >> > > >> > > Got it. Therefore, the realization with InputFormat is not considered. >> > > Thanks for clearing that up! >> > > >> > > Best regards, >> > > Smirnov Alexander >> > > >> > > чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com>: >> > > > >> > > > Hi, >> > > > >> > > > With regards to: >> > > > >> > > > > But if there are plans to refactor all connectors to FLIP-27 >> > > > >> > > > Yes, FLIP-27 is the target for all connectors. The old interfaces >> will be >> > > > deprecated and connectors will either be refactored to use the new >> ones >> > > or >> > > > dropped. >> > > > >> > > > The caching should work for connectors that are using FLIP-27 >> interfaces, >> > > > we should not introduce new features for old interfaces. >> > > > >> > > > Best regards, >> > > > >> > > > Martijn >> > > > >> > > > On Thu, 12 May 2022 at 06:19, Александр Смирнов < >> smirale...@gmail.com> >> > > > wrote: >> > > > >> > > > > Hi Jark! >> > > > > >> > > > > Sorry for the late response. I would like to make some comments and >> > > > > clarify my points. >> > > > > >> > > > > 1) I agree with your first statement. I think we can achieve both >> > > > > advantages this way: put the Cache interface in flink-table-common, >> > > > > but have implementations of it in flink-table-runtime. Therefore >> if a >> > > > > connector developer wants to use existing cache strategies and >> their >> > > > > implementations, he can just pass lookupConfig to the planner, but >> if >> > > > > he wants to have its own cache implementation in his >> TableFunction, it >> > > > > will be possible for him to use the existing interface for this >> > > > > purpose (we can explicitly point this out in the documentation). In >> > > > > this way all configs and metrics will be unified. WDYT? >> > > > > >> > > > > > If a filter can prune 90% of data in the cache, we will have 90% >> of >> > > > > lookup requests that can never be cached >> > > > > >> > > > > 2) Let me clarify the logic filters optimization in case of LRU >> cache. >> > > > > It looks like Cache<RowData, Collection<RowData>>. Here we always >> > > > > store the response of the dimension table in cache, even after >> > > > > applying calc function. I.e. if there are no rows after applying >> > > > > filters to the result of the 'eval' method of TableFunction, we >> store >> > > > > the empty list by lookup keys. Therefore the cache line will be >> > > > > filled, but will require much less memory (in bytes). I.e. we don't >> > > > > completely filter keys, by which result was pruned, but >> significantly >> > > > > reduce required memory to store this result. If the user knows >> about >> > > > > this behavior, he can increase the 'max-rows' option before the >> start >> > > > > of the job. But actually I came up with the idea that we can do >> this >> > > > > automatically by using the 'maximumWeight' and 'weigher' methods of >> > > > > GuavaCache [1]. Weight can be the size of the collection of rows >> > > > > (value of cache). Therefore cache can automatically fit much more >> > > > > records than before. >> > > > > >> > > > > > Flink SQL has provided a standard way to do filters and projects >> > > > > pushdown, i.e., SupportsFilterPushDown and >> SupportsProjectionPushDown. >> > > > > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean >> it's >> > > hard >> > > > > to implement. >> > > > > >> > > > > It's debatable how difficult it will be to implement filter >> pushdown. >> > > > > But I think the fact that currently there is no database connector >> > > > > with filter pushdown at least means that this feature won't be >> > > > > supported soon in connectors. Moreover, if we talk about other >> > > > > connectors (not in Flink repo), their databases might not support >> all >> > > > > Flink filters (or not support filters at all). I think users are >> > > > > interested in supporting cache filters optimization independently >> of >> > > > > supporting other features and solving more complex problems (or >> > > > > unsolvable at all). >> > > > > >> > > > > 3) I agree with your third statement. Actually in our internal >> version >> > > > > I also tried to unify the logic of scanning and reloading data from >> > > > > connectors. But unfortunately, I didn't find a way to unify the >> logic >> > > > > of all ScanRuntimeProviders (InputFormat, SourceFunction, >> Source,...) >> > > > > and reuse it in reloading ALL cache. As a result I settled on using >> > > > > InputFormat, because it was used for scanning in all lookup >> > > > > connectors. (I didn't know that there are plans to deprecate >> > > > > InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27 >> source >> > > > > in ALL caching is not good idea, because this source was designed >> to >> > > > > work in distributed environment (SplitEnumerator on JobManager and >> > > > > SourceReaders on TaskManagers), not in one operator (lookup join >> > > > > operator in our case). There is even no direct way to pass splits >> from >> > > > > SplitEnumerator to SourceReader (this logic works through >> > > > > SplitEnumeratorContext, which requires >> > > > > OperatorCoordinator.SubtaskGateway to send AddSplitEvents). Usage >> of >> > > > > InputFormat for ALL cache seems much more clearer and easier. But >> if >> > > > > there are plans to refactor all connectors to FLIP-27, I have the >> > > > > following ideas: maybe we can refuse from lookup join ALL cache in >> > > > > favor of simple join with multiple scanning of batch source? The >> point >> > > > > is that the only difference between lookup join ALL cache and >> simple >> > > > > join with batch source is that in the first case scanning is >> performed >> > > > > multiple times, in between which state (cache) is cleared (correct >> me >> > > > > if I'm wrong). So what if we extend the functionality of simple >> join >> > > > > to support state reloading + extend the functionality of scanning >> > > > > batch source multiple times (this one should be easy with new >> FLIP-27 >> > > > > source, that unifies streaming/batch reading - we will need to >> change >> > > > > only SplitEnumerator, which will pass splits again after some TTL). >> > > > > WDYT? I must say that this looks like a long-term goal and will >> make >> > > > > the scope of this FLIP even larger than you said. Maybe we can >> limit >> > > > > ourselves to a simpler solution now (InputFormats). >> > > > > >> > > > > So to sum up, my points is like this: >> > > > > 1) There is a way to make both concise and flexible interfaces for >> > > > > caching in lookup join. >> > > > > 2) Cache filters optimization is important both in LRU and ALL >> caches. >> > > > > 3) It is unclear when filter pushdown will be supported in Flink >> > > > > connectors, some of the connectors might not have the opportunity >> to >> > > > > support filter pushdown + as I know, currently filter pushdown >> works >> > > > > only for scanning (not lookup). So cache filters + projections >> > > > > optimization should be independent from other features. >> > > > > 4) ALL cache realization is a complex topic that involves multiple >> > > > > aspects of how Flink is developing. Refusing from InputFormat in >> favor >> > > > > of FLIP-27 Source will make ALL cache realization really complex >> and >> > > > > not clear, so maybe instead of that we can extend the >> functionality of >> > > > > simple join or not refuse from InputFormat in case of lookup join >> ALL >> > > > > cache? >> > > > > >> > > > > Best regards, >> > > > > Smirnov Alexander >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > [1] >> > > > > >> > > >> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) >> > > > > >> > > > > чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>: >> > > > > > >> > > > > > It's great to see the active discussion! I want to share my >> ideas: >> > > > > > >> > > > > > 1) implement the cache in framework vs. connectors base >> > > > > > I don't have a strong opinion on this. Both ways should work >> (e.g., >> > > cache >> > > > > > pruning, compatibility). >> > > > > > The framework way can provide more concise interfaces. >> > > > > > The connector base way can define more flexible cache >> > > > > > strategies/implementations. >> > > > > > We are still investigating a way to see if we can have both >> > > advantages. >> > > > > > We should reach a consensus that the way should be a final state, >> > > and we >> > > > > > are on the path to it. >> > > > > > >> > > > > > 2) filters and projections pushdown: >> > > > > > I agree with Alex that the filter pushdown into cache can >> benefit a >> > > lot >> > > > > for >> > > > > > ALL cache. >> > > > > > However, this is not true for LRU cache. Connectors use cache to >> > > reduce >> > > > > IO >> > > > > > requests to databases for better throughput. >> > > > > > If a filter can prune 90% of data in the cache, we will have 90% >> of >> > > > > lookup >> > > > > > requests that can never be cached >> > > > > > and hit directly to the databases. That means the cache is >> > > meaningless in >> > > > > > this case. >> > > > > > >> > > > > > IMO, Flink SQL has provided a standard way to do filters and >> projects >> > > > > > pushdown, i.e., SupportsFilterPushDown and >> > > SupportsProjectionPushDown. >> > > > > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean >> it's >> > > hard >> > > > > to >> > > > > > implement. >> > > > > > They should implement the pushdown interfaces to reduce IO and >> the >> > > cache >> > > > > > size. >> > > > > > That should be a final state that the scan source and lookup >> source >> > > share >> > > > > > the exact pushdown implementation. >> > > > > > I don't see why we need to duplicate the pushdown logic in >> caches, >> > > which >> > > > > > will complex the lookup join design. >> > > > > > >> > > > > > 3) ALL cache abstraction >> > > > > > All cache might be the most challenging part of this FLIP. We >> have >> > > never >> > > > > > provided a reload-lookup public interface. >> > > > > > Currently, we put the reload logic in the "eval" method of >> > > TableFunction. >> > > > > > That's hard for some sources (e.g., Hive). >> > > > > > Ideally, connector implementation should share the logic of >> reload >> > > and >> > > > > > scan, i.e. ScanTableSource with >> InputFormat/SourceFunction/FLIP-27 >> > > > > Source. >> > > > > > However, InputFormat/SourceFunction are deprecated, and the >> FLIP-27 >> > > > > source >> > > > > > is deeply coupled with SourceOperator. >> > > > > > If we want to invoke the FLIP-27 source in LookupJoin, this may >> make >> > > the >> > > > > > scope of this FLIP much larger. >> > > > > > We are still investigating how to abstract the ALL cache logic >> and >> > > reuse >> > > > > > the existing source interfaces. >> > > > > > >> > > > > > >> > > > > > Best, >> > > > > > Jark >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com> >> > > wrote: >> > > > > > >> > > > > > > It's a much more complicated activity and lies out of the >> scope of >> > > this >> > > > > > > improvement. Because such pushdowns should be done for all >> > > > > ScanTableSource >> > > > > > > implementations (not only for Lookup ones). >> > > > > > > >> > > > > > > On Thu, 5 May 2022 at 19:02, Martijn Visser < >> > > martijnvis...@apache.org> >> > > > > > > wrote: >> > > > > > > >> > > > > > >> Hi everyone, >> > > > > > >> >> > > > > > >> One question regarding "And Alexander correctly mentioned that >> > > filter >> > > > > > >> pushdown still is not implemented for jdbc/hive/hbase." -> >> Would >> > > an >> > > > > > >> alternative solution be to actually implement these filter >> > > pushdowns? >> > > > > I >> > > > > > >> can >> > > > > > >> imagine that there are many more benefits to doing that, >> outside >> > > of >> > > > > lookup >> > > > > > >> caching and metrics. >> > > > > > >> >> > > > > > >> Best regards, >> > > > > > >> >> > > > > > >> Martijn Visser >> > > > > > >> https://twitter.com/MartijnVisser82 >> > > > > > >> https://github.com/MartijnVisser >> > > > > > >> >> > > > > > >> >> > > > > > >> On Thu, 5 May 2022 at 13:58, Roman Boyko < >> ro.v.bo...@gmail.com> >> > > > > wrote: >> > > > > > >> >> > > > > > >> > Hi everyone! >> > > > > > >> > >> > > > > > >> > Thanks for driving such a valuable improvement! >> > > > > > >> > >> > > > > > >> > I do think that single cache implementation would be a nice >> > > > > opportunity >> > > > > > >> for >> > > > > > >> > users. And it will break the "FOR SYSTEM_TIME AS OF >> proc_time" >> > > > > semantics >> > > > > > >> > anyway - doesn't matter how it will be implemented. >> > > > > > >> > >> > > > > > >> > Putting myself in the user's shoes, I can say that: >> > > > > > >> > 1) I would prefer to have the opportunity to cut off the >> cache >> > > size >> > > > > by >> > > > > > >> > simply filtering unnecessary data. And the most handy way >> to do >> > > it >> > > > > is >> > > > > > >> apply >> > > > > > >> > it inside LookupRunners. It would be a bit harder to pass it >> > > > > through the >> > > > > > >> > LookupJoin node to TableFunction. And Alexander correctly >> > > mentioned >> > > > > that >> > > > > > >> > filter pushdown still is not implemented for >> jdbc/hive/hbase. >> > > > > > >> > 2) The ability to set the different caching parameters for >> > > different >> > > > > > >> tables >> > > > > > >> > is quite important. So I would prefer to set it through DDL >> > > rather >> > > > > than >> > > > > > >> > have similar ttla, strategy and other options for all lookup >> > > tables. >> > > > > > >> > 3) Providing the cache into the framework really deprives >> us of >> > > > > > >> > extensibility (users won't be able to implement their own >> > > cache). >> > > > > But >> > > > > > >> most >> > > > > > >> > probably it might be solved by creating more different cache >> > > > > strategies >> > > > > > >> and >> > > > > > >> > a wider set of configurations. >> > > > > > >> > >> > > > > > >> > All these points are much closer to the schema proposed by >> > > > > Alexander. >> > > > > > >> > Qingshen Ren, please correct me if I'm not right and all >> these >> > > > > > >> facilities >> > > > > > >> > might be simply implemented in your architecture? >> > > > > > >> > >> > > > > > >> > Best regards, >> > > > > > >> > Roman Boyko >> > > > > > >> > e.: ro.v.bo...@gmail.com >> > > > > > >> > >> > > > > > >> > On Wed, 4 May 2022 at 21:01, Martijn Visser < >> > > > > martijnvis...@apache.org> >> > > > > > >> > wrote: >> > > > > > >> > >> > > > > > >> > > Hi everyone, >> > > > > > >> > > >> > > > > > >> > > I don't have much to chip in, but just wanted to express >> that >> > > I >> > > > > really >> > > > > > >> > > appreciate the in-depth discussion on this topic and I >> hope >> > > that >> > > > > > >> others >> > > > > > >> > > will join the conversation. >> > > > > > >> > > >> > > > > > >> > > Best regards, >> > > > > > >> > > >> > > > > > >> > > Martijn >> > > > > > >> > > >> > > > > > >> > > On Tue, 3 May 2022 at 10:15, Александр Смирнов < >> > > > > smirale...@gmail.com> >> > > > > > >> > > wrote: >> > > > > > >> > > >> > > > > > >> > > > Hi Qingsheng, Leonard and Jark, >> > > > > > >> > > > >> > > > > > >> > > > Thanks for your detailed feedback! However, I have >> questions >> > > > > about >> > > > > > >> > > > some of your statements (maybe I didn't get something?). >> > > > > > >> > > > >> > > > > > >> > > > > Caching actually breaks the semantic of "FOR >> SYSTEM_TIME >> > > AS OF >> > > > > > >> > > proc_time” >> > > > > > >> > > > >> > > > > > >> > > > I agree that the semantics of "FOR SYSTEM_TIME AS OF >> > > proc_time" >> > > > > is >> > > > > > >> not >> > > > > > >> > > > fully implemented with caching, but as you said, users >> go >> > > on it >> > > > > > >> > > > consciously to achieve better performance (no one >> proposed >> > > to >> > > > > enable >> > > > > > >> > > > caching by default, etc.). Or by users do you mean other >> > > > > developers >> > > > > > >> of >> > > > > > >> > > > connectors? In this case developers explicitly specify >> > > whether >> > > > > their >> > > > > > >> > > > connector supports caching or not (in the list of >> supported >> > > > > > >> options), >> > > > > > >> > > > no one makes them do that if they don't want to. So what >> > > > > exactly is >> > > > > > >> > > > the difference between implementing caching in modules >> > > > > > >> > > > flink-table-runtime and in flink-table-common from the >> > > > > considered >> > > > > > >> > > > point of view? How does it affect on >> breaking/non-breaking >> > > the >> > > > > > >> > > > semantics of "FOR SYSTEM_TIME AS OF proc_time"? >> > > > > > >> > > > >> > > > > > >> > > > > confront a situation that allows table options in DDL >> to >> > > > > control >> > > > > > >> the >> > > > > > >> > > > behavior of the framework, which has never happened >> > > previously >> > > > > and >> > > > > > >> > should >> > > > > > >> > > > be cautious >> > > > > > >> > > > >> > > > > > >> > > > If we talk about main differences of semantics of DDL >> > > options >> > > > > and >> > > > > > >> > > > config options("table.exec.xxx"), isn't it about >> limiting >> > > the >> > > > > scope >> > > > > > >> of >> > > > > > >> > > > the options + importance for the user business logic >> rather >> > > than >> > > > > > >> > > > specific location of corresponding logic in the >> framework? I >> > > > > mean >> > > > > > >> that >> > > > > > >> > > > in my design, for example, putting an option with lookup >> > > cache >> > > > > > >> > > > strategy in configurations would be the wrong decision, >> > > > > because it >> > > > > > >> > > > directly affects the user's business logic (not just >> > > performance >> > > > > > >> > > > optimization) + touches just several functions of ONE >> table >> > > > > (there >> > > > > > >> can >> > > > > > >> > > > be multiple tables with different caches). Does it >> really >> > > > > matter for >> > > > > > >> > > > the user (or someone else) where the logic is located, >> > > which is >> > > > > > >> > > > affected by the applied option? >> > > > > > >> > > > Also I can remember DDL option 'sink.parallelism', >> which in >> > > > > some way >> > > > > > >> > > > "controls the behavior of the framework" and I don't >> see any >> > > > > problem >> > > > > > >> > > > here. >> > > > > > >> > > > >> > > > > > >> > > > > introduce a new interface for this all-caching >> scenario >> > > and >> > > > > the >> > > > > > >> > design >> > > > > > >> > > > would become more complex >> > > > > > >> > > > >> > > > > > >> > > > This is a subject for a separate discussion, but >> actually >> > > in our >> > > > > > >> > > > internal version we solved this problem quite easily - >> we >> > > reused >> > > > > > >> > > > InputFormat class (so there is no need for a new API). >> The >> > > > > point is >> > > > > > >> > > > that currently all lookup connectors use InputFormat for >> > > > > scanning >> > > > > > >> the >> > > > > > >> > > > data in batch mode: HBase, JDBC and even Hive - it uses >> > > class >> > > > > > >> > > > PartitionReader, that is actually just a wrapper around >> > > > > InputFormat. >> > > > > > >> > > > The advantage of this solution is the ability to reload >> > > cache >> > > > > data >> > > > > > >> in >> > > > > > >> > > > parallel (number of threads depends on number of >> > > InputSplits, >> > > > > but >> > > > > > >> has >> > > > > > >> > > > an upper limit). As a result cache reload time >> significantly >> > > > > reduces >> > > > > > >> > > > (as well as time of input stream blocking). I know that >> > > usually >> > > > > we >> > > > > > >> try >> > > > > > >> > > > to avoid usage of concurrency in Flink code, but maybe >> this >> > > one >> > > > > can >> > > > > > >> be >> > > > > > >> > > > an exception. BTW I don't say that it's an ideal >> solution, >> > > maybe >> > > > > > >> there >> > > > > > >> > > > are better ones. >> > > > > > >> > > > >> > > > > > >> > > > > Providing the cache in the framework might introduce >> > > > > compatibility >> > > > > > >> > > issues >> > > > > > >> > > > >> > > > > > >> > > > It's possible only in cases when the developer of the >> > > connector >> > > > > > >> won't >> > > > > > >> > > > properly refactor his code and will use new cache >> options >> > > > > > >> incorrectly >> > > > > > >> > > > (i.e. explicitly provide the same options into 2 >> different >> > > code >> > > > > > >> > > > places). For correct behavior all he will need to do is >> to >> > > > > redirect >> > > > > > >> > > > existing options to the framework's LookupConfig (+ >> maybe >> > > add an >> > > > > > >> alias >> > > > > > >> > > > for options, if there was different naming), everything >> > > will be >> > > > > > >> > > > transparent for users. If the developer won't do >> > > refactoring at >> > > > > all, >> > > > > > >> > > > nothing will be changed for the connector because of >> > > backward >> > > > > > >> > > > compatibility. Also if a developer wants to use his own >> > > cache >> > > > > logic, >> > > > > > >> > > > he just can refuse to pass some of the configs into the >> > > > > framework, >> > > > > > >> and >> > > > > > >> > > > instead make his own implementation with already >> existing >> > > > > configs >> > > > > > >> and >> > > > > > >> > > > metrics (but actually I think that it's a rare case). >> > > > > > >> > > > >> > > > > > >> > > > > filters and projections should be pushed all the way >> down >> > > to >> > > > > the >> > > > > > >> > table >> > > > > > >> > > > function, like what we do in the scan source >> > > > > > >> > > > >> > > > > > >> > > > It's the great purpose. But the truth is that the ONLY >> > > connector >> > > > > > >> that >> > > > > > >> > > > supports filter pushdown is FileSystemTableSource >> > > > > > >> > > > (no database connector supports it currently). Also for >> some >> > > > > > >> databases >> > > > > > >> > > > it's simply impossible to pushdown such complex filters >> > > that we >> > > > > have >> > > > > > >> > > > in Flink. >> > > > > > >> > > > >> > > > > > >> > > > > only applying these optimizations to the cache seems >> not >> > > > > quite >> > > > > > >> > useful >> > > > > > >> > > > >> > > > > > >> > > > Filters can cut off an arbitrarily large amount of data >> > > from the >> > > > > > >> > > > dimension table. For a simple example, suppose in >> dimension >> > > > > table >> > > > > > >> > > > 'users' >> > > > > > >> > > > we have column 'age' with values from 20 to 40, and >> input >> > > stream >> > > > > > >> > > > 'clicks' that is ~uniformly distributed by age of >> users. If >> > > we >> > > > > have >> > > > > > >> > > > filter 'age > 30', >> > > > > > >> > > > there will be twice less data in cache. This means the >> user >> > > can >> > > > > > >> > > > increase 'lookup.cache.max-rows' by almost 2 times. It >> will >> > > > > gain a >> > > > > > >> > > > huge >> > > > > > >> > > > performance boost. Moreover, this optimization starts to >> > > really >> > > > > > >> shine >> > > > > > >> > > > in 'ALL' cache, where tables without filters and >> projections >> > > > > can't >> > > > > > >> fit >> > > > > > >> > > > in memory, but with them - can. This opens up additional >> > > > > > >> possibilities >> > > > > > >> > > > for users. And this doesn't sound as 'not quite useful'. >> > > > > > >> > > > >> > > > > > >> > > > It would be great to hear other voices regarding this >> topic! >> > > > > Because >> > > > > > >> > > > we have quite a lot of controversial points, and I think >> > > with >> > > > > the >> > > > > > >> help >> > > > > > >> > > > of others it will be easier for us to come to a >> consensus. >> > > > > > >> > > > >> > > > > > >> > > > Best regards, >> > > > > > >> > > > Smirnov Alexander >> > > > > > >> > > > >> > > > > > >> > > > >> > > > > > >> > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren < >> > > renqs...@gmail.com >> > > > > >: >> > > > > > >> > > > > >> > > > > > >> > > > > Hi Alexander and Arvid, >> > > > > > >> > > > > >> > > > > > >> > > > > Thanks for the discussion and sorry for my late >> response! >> > > We >> > > > > had >> > > > > > >> an >> > > > > > >> > > > internal discussion together with Jark and Leonard and >> I’d >> > > like >> > > > > to >> > > > > > >> > > > summarize our ideas. Instead of implementing the cache >> > > logic in >> > > > > the >> > > > > > >> > table >> > > > > > >> > > > runtime layer or wrapping around the user-provided table >> > > > > function, >> > > > > > >> we >> > > > > > >> > > > prefer to introduce some new APIs extending >> TableFunction >> > > with >> > > > > these >> > > > > > >> > > > concerns: >> > > > > > >> > > > > >> > > > > > >> > > > > 1. Caching actually breaks the semantic of "FOR >> > > SYSTEM_TIME >> > > > > AS OF >> > > > > > >> > > > proc_time”, because it couldn’t truly reflect the >> content >> > > of the >> > > > > > >> lookup >> > > > > > >> > > > table at the moment of querying. If users choose to >> enable >> > > > > caching >> > > > > > >> on >> > > > > > >> > the >> > > > > > >> > > > lookup table, they implicitly indicate that this >> breakage is >> > > > > > >> acceptable >> > > > > > >> > > in >> > > > > > >> > > > exchange for the performance. So we prefer not to >> provide >> > > > > caching on >> > > > > > >> > the >> > > > > > >> > > > table runtime level. >> > > > > > >> > > > > >> > > > > > >> > > > > 2. If we make the cache implementation in the >> framework >> > > > > (whether >> > > > > > >> in a >> > > > > > >> > > > runner or a wrapper around TableFunction), we have to >> > > confront a >> > > > > > >> > > situation >> > > > > > >> > > > that allows table options in DDL to control the >> behavior of >> > > the >> > > > > > >> > > framework, >> > > > > > >> > > > which has never happened previously and should be >> cautious. >> > > > > Under >> > > > > > >> the >> > > > > > >> > > > current design the behavior of the framework should >> only be >> > > > > > >> specified >> > > > > > >> > by >> > > > > > >> > > > configurations (“table.exec.xxx”), and it’s hard to >> apply >> > > these >> > > > > > >> general >> > > > > > >> > > > configs to a specific table. >> > > > > > >> > > > > >> > > > > > >> > > > > 3. We have use cases that lookup source loads and >> refresh >> > > all >> > > > > > >> records >> > > > > > >> > > > periodically into the memory to achieve high lookup >> > > performance >> > > > > > >> (like >> > > > > > >> > > Hive >> > > > > > >> > > > connector in the community, and also widely used by our >> > > internal >> > > > > > >> > > > connectors). Wrapping the cache around the user’s >> > > TableFunction >> > > > > > >> works >> > > > > > >> > > fine >> > > > > > >> > > > for LRU caches, but I think we have to introduce a new >> > > > > interface for >> > > > > > >> > this >> > > > > > >> > > > all-caching scenario and the design would become more >> > > complex. >> > > > > > >> > > > > >> > > > > > >> > > > > 4. Providing the cache in the framework might >> introduce >> > > > > > >> compatibility >> > > > > > >> > > > issues to existing lookup sources like there might >> exist two >> > > > > caches >> > > > > > >> > with >> > > > > > >> > > > totally different strategies if the user incorrectly >> > > configures >> > > > > the >> > > > > > >> > table >> > > > > > >> > > > (one in the framework and another implemented by the >> lookup >> > > > > source). >> > > > > > >> > > > > >> > > > > > >> > > > > As for the optimization mentioned by Alexander, I >> think >> > > > > filters >> > > > > > >> and >> > > > > > >> > > > projections should be pushed all the way down to the >> table >> > > > > function, >> > > > > > >> > like >> > > > > > >> > > > what we do in the scan source, instead of the runner >> with >> > > the >> > > > > cache. >> > > > > > >> > The >> > > > > > >> > > > goal of using cache is to reduce the network I/O and >> > > pressure >> > > > > on the >> > > > > > >> > > > external system, and only applying these optimizations >> to >> > > the >> > > > > cache >> > > > > > >> > seems >> > > > > > >> > > > not quite useful. >> > > > > > >> > > > > >> > > > > > >> > > > > I made some updates to the FLIP[1] to reflect our >> ideas. >> > > We >> > > > > > >> prefer to >> > > > > > >> > > > keep the cache implementation as a part of >> TableFunction, >> > > and we >> > > > > > >> could >> > > > > > >> > > > provide some helper classes (CachingTableFunction, >> > > > > > >> > > AllCachingTableFunction, >> > > > > > >> > > > CachingAsyncTableFunction) to developers and regulate >> > > metrics >> > > > > of the >> > > > > > >> > > cache. >> > > > > > >> > > > Also, I made a POC[2] for your reference. >> > > > > > >> > > > > >> > > > > > >> > > > > Looking forward to your ideas! >> > > > > > >> > > > > >> > > > > > >> > > > > [1] >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> > > > > > >> > > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221 >> > > > > > >> > > > > >> > > > > > >> > > > > Best regards, >> > > > > > >> > > > > >> > > > > > >> > > > > Qingsheng >> > > > > > >> > > > > >> > > > > > >> > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов < >> > > > > > >> > > smirale...@gmail.com> >> > > > > > >> > > > wrote: >> > > > > > >> > > > >> >> > > > > > >> > > > >> Thanks for the response, Arvid! >> > > > > > >> > > > >> >> > > > > > >> > > > >> I have few comments on your message. >> > > > > > >> > > > >> >> > > > > > >> > > > >> > but could also live with an easier solution as the >> > > first >> > > > > step: >> > > > > > >> > > > >> >> > > > > > >> > > > >> I think that these 2 ways are mutually exclusive >> > > (originally >> > > > > > >> > proposed >> > > > > > >> > > > >> by Qingsheng and mine), because conceptually they >> follow >> > > the >> > > > > same >> > > > > > >> > > > >> goal, but implementation details are different. If we >> > > will >> > > > > go one >> > > > > > >> > way, >> > > > > > >> > > > >> moving to another way in the future will mean >> deleting >> > > > > existing >> > > > > > >> code >> > > > > > >> > > > >> and once again changing the API for connectors. So I >> > > think we >> > > > > > >> should >> > > > > > >> > > > >> reach a consensus with the community about that and >> then >> > > work >> > > > > > >> > together >> > > > > > >> > > > >> on this FLIP, i.e. divide the work on tasks for >> different >> > > > > parts >> > > > > > >> of >> > > > > > >> > the >> > > > > > >> > > > >> flip (for example, LRU cache unification / >> introducing >> > > > > proposed >> > > > > > >> set >> > > > > > >> > of >> > > > > > >> > > > >> metrics / further work…). WDYT, Qingsheng? >> > > > > > >> > > > >> >> > > > > > >> > > > >> > as the source will only receive the requests after >> > > filter >> > > > > > >> > > > >> >> > > > > > >> > > > >> Actually if filters are applied to fields of the >> lookup >> > > > > table, we >> > > > > > >> > > > >> firstly must do requests, and only after that we can >> > > filter >> > > > > > >> > responses, >> > > > > > >> > > > >> because lookup connectors don't have filter >> pushdown. So >> > > if >> > > > > > >> > filtering >> > > > > > >> > > > >> is done before caching, there will be much less rows >> in >> > > > > cache. >> > > > > > >> > > > >> >> > > > > > >> > > > >> > @Alexander unfortunately, your architecture is not >> > > shared. >> > > > > I >> > > > > > >> don't >> > > > > > >> > > > know the >> > > > > > >> > > > >> >> > > > > > >> > > > >> > solution to share images to be honest. >> > > > > > >> > > > >> >> > > > > > >> > > > >> Sorry for that, I’m a bit new to such kinds of >> > > conversations >> > > > > :) >> > > > > > >> > > > >> I have no write access to the confluence, so I made a >> > > Jira >> > > > > issue, >> > > > > > >> > > > >> where described the proposed changes in more details >> - >> > > > > > >> > > > >> https://issues.apache.org/jira/browse/FLINK-27411. >> > > > > > >> > > > >> >> > > > > > >> > > > >> Will happy to get more feedback! >> > > > > > >> > > > >> >> > > > > > >> > > > >> Best, >> > > > > > >> > > > >> Smirnov Alexander >> > > > > > >> > > > >> >> > > > > > >> > > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise < >> > > ar...@apache.org>: >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > Hi Qingsheng, >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > Thanks for driving this; the inconsistency was not >> > > > > satisfying >> > > > > > >> for >> > > > > > >> > > me. >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > I second Alexander's idea though but could also >> live >> > > with >> > > > > an >> > > > > > >> > easier >> > > > > > >> > > > >> > solution as the first step: Instead of making >> caching >> > > an >> > > > > > >> > > > implementation >> > > > > > >> > > > >> > detail of TableFunction X, rather devise a caching >> > > layer >> > > > > > >> around X. >> > > > > > >> > > So >> > > > > > >> > > > the >> > > > > > >> > > > >> > proposal would be a CachingTableFunction that >> > > delegates to >> > > > > X in >> > > > > > >> > case >> > > > > > >> > > > of >> > > > > > >> > > > >> > misses and else manages the cache. Lifting it into >> the >> > > > > operator >> > > > > > >> > > model >> > > > > > >> > > > as >> > > > > > >> > > > >> > proposed would be even better but is probably >> > > unnecessary >> > > > > in >> > > > > > >> the >> > > > > > >> > > > first step >> > > > > > >> > > > >> > for a lookup source (as the source will only >> receive >> > > the >> > > > > > >> requests >> > > > > > >> > > > after >> > > > > > >> > > > >> > filter; applying projection may be more >> interesting to >> > > save >> > > > > > >> > memory). >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > Another advantage is that all the changes of this >> FLIP >> > > > > would be >> > > > > > >> > > > limited to >> > > > > > >> > > > >> > options, no need for new public interfaces. >> Everything >> > > else >> > > > > > >> > remains >> > > > > > >> > > an >> > > > > > >> > > > >> > implementation of Table runtime. That means we can >> > > easily >> > > > > > >> > > incorporate >> > > > > > >> > > > the >> > > > > > >> > > > >> > optimization potential that Alexander pointed out >> > > later. >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > @Alexander unfortunately, your architecture is not >> > > shared. >> > > > > I >> > > > > > >> don't >> > > > > > >> > > > know the >> > > > > > >> > > > >> > solution to share images to be honest. >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов < >> > > > > > >> > > > smirale...@gmail.com> >> > > > > > >> > > > >> > wrote: >> > > > > > >> > > > >> > >> > > > > > >> > > > >> > > Hi Qingsheng! My name is Alexander, I'm not a >> > > committer >> > > > > yet, >> > > > > > >> but >> > > > > > >> > > I'd >> > > > > > >> > > > >> > > really like to become one. And this FLIP really >> > > > > interested >> > > > > > >> me. >> > > > > > >> > > > >> > > Actually I have worked on a similar feature in my >> > > > > company’s >> > > > > > >> > Flink >> > > > > > >> > > > >> > > fork, and we would like to share our thoughts on >> > > this and >> > > > > > >> make >> > > > > > >> > > code >> > > > > > >> > > > >> > > open source. >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > I think there is a better alternative than >> > > introducing an >> > > > > > >> > abstract >> > > > > > >> > > > >> > > class for TableFunction (CachingTableFunction). >> As >> > > you >> > > > > know, >> > > > > > >> > > > >> > > TableFunction exists in the flink-table-common >> > > module, >> > > > > which >> > > > > > >> > > > provides >> > > > > > >> > > > >> > > only an API for working with tables – it’s very >> > > > > convenient >> > > > > > >> for >> > > > > > >> > > > importing >> > > > > > >> > > > >> > > in connectors. In turn, CachingTableFunction >> contains >> > > > > logic >> > > > > > >> for >> > > > > > >> > > > >> > > runtime execution, so this class and everything >> > > > > connected >> > > > > > >> with >> > > > > > >> > it >> > > > > > >> > > > >> > > should be located in another module, probably in >> > > > > > >> > > > flink-table-runtime. >> > > > > > >> > > > >> > > But this will require connectors to depend on >> another >> > > > > module, >> > > > > > >> > > which >> > > > > > >> > > > >> > > contains a lot of runtime logic, which doesn’t >> sound >> > > > > good. >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > I suggest adding a new method ‘getLookupConfig’ >> to >> > > > > > >> > > LookupTableSource >> > > > > > >> > > > >> > > or LookupRuntimeProvider to allow connectors to >> only >> > > pass >> > > > > > >> > > > >> > > configurations to the planner, therefore they >> won’t >> > > > > depend on >> > > > > > >> > > > runtime >> > > > > > >> > > > >> > > realization. Based on these configs planner will >> > > > > construct a >> > > > > > >> > > lookup >> > > > > > >> > > > >> > > join operator with corresponding runtime logic >> > > > > > >> (ProcessFunctions >> > > > > > >> > > in >> > > > > > >> > > > >> > > module flink-table-runtime). Architecture looks >> like >> > > in >> > > > > the >> > > > > > >> > pinned >> > > > > > >> > > > >> > > image (LookupConfig class there is actually yours >> > > > > > >> CacheConfig). >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > Classes in flink-table-planner, that will be >> > > responsible >> > > > > for >> > > > > > >> > this >> > > > > > >> > > – >> > > > > > >> > > > >> > > CommonPhysicalLookupJoin and his inheritors. >> > > > > > >> > > > >> > > Current classes for lookup join in >> > > flink-table-runtime >> > > > > - >> > > > > > >> > > > >> > > LookupJoinRunner, AsyncLookupJoinRunner, >> > > > > > >> > LookupJoinRunnerWithCalc, >> > > > > > >> > > > >> > > AsyncLookupJoinRunnerWithCalc. >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > I suggest adding classes LookupJoinCachingRunner, >> > > > > > >> > > > >> > > LookupJoinCachingRunnerWithCalc, etc. >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > And here comes another more powerful advantage of >> > > such a >> > > > > > >> > solution. >> > > > > > >> > > > If >> > > > > > >> > > > >> > > we have caching logic on a lower level, we can >> apply >> > > some >> > > > > > >> > > > >> > > optimizations to it. LookupJoinRunnerWithCalc was >> > > named >> > > > > like >> > > > > > >> > this >> > > > > > >> > > > >> > > because it uses the ‘calc’ function, which >> actually >> > > > > mostly >> > > > > > >> > > consists >> > > > > > >> > > > of >> > > > > > >> > > > >> > > filters and projections. >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > For example, in join table A with lookup table B >> > > > > condition >> > > > > > >> > ‘JOIN … >> > > > > > >> > > > ON >> > > > > > >> > > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE >> B.salary > >> > > 1000’ >> > > > > > >> > ‘calc’ >> > > > > > >> > > > >> > > function will contain filters A.age = B.age + 10 >> and >> > > > > > >> B.salary > >> > > > > > >> > > > 1000. >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > If we apply this function before storing records >> in >> > > > > cache, >> > > > > > >> size >> > > > > > >> > of >> > > > > > >> > > > >> > > cache will be significantly reduced: filters = >> avoid >> > > > > storing >> > > > > > >> > > useless >> > > > > > >> > > > >> > > records in cache, projections = reduce records’ >> > > size. So >> > > > > the >> > > > > > >> > > initial >> > > > > > >> > > > >> > > max number of records in cache can be increased >> by >> > > the >> > > > > user. >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > What do you think about it? >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote: >> > > > > > >> > > > >> > > > Hi devs, >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > Yuan and I would like to start a discussion >> about >> > > > > > >> FLIP-221[1], >> > > > > > >> > > > which >> > > > > > >> > > > >> > > introduces an abstraction of lookup table cache >> and >> > > its >> > > > > > >> standard >> > > > > > >> > > > metrics. >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > Currently each lookup table source should >> implement >> > > > > their >> > > > > > >> own >> > > > > > >> > > > cache to >> > > > > > >> > > > >> > > store lookup results, and there isn’t a standard >> of >> > > > > metrics >> > > > > > >> for >> > > > > > >> > > > users and >> > > > > > >> > > > >> > > developers to tuning their jobs with lookup >> joins, >> > > which >> > > > > is a >> > > > > > >> > > quite >> > > > > > >> > > > common >> > > > > > >> > > > >> > > use case in Flink table / SQL. >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > Therefore we propose some new APIs including >> cache, >> > > > > > >> metrics, >> > > > > > >> > > > wrapper >> > > > > > >> > > > >> > > classes of TableFunction and new table options. >> > > Please >> > > > > take a >> > > > > > >> > look >> > > > > > >> > > > at the >> > > > > > >> > > > >> > > FLIP page [1] to get more details. Any >> suggestions >> > > and >> > > > > > >> comments >> > > > > > >> > > > would be >> > > > > > >> > > > >> > > appreciated! >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > [1] >> > > > > > >> > > > >> > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > >> > > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > Best regards, >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > Qingsheng >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > > >> > > > > > >> > > > >> > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > -- >> > > > > > >> > > > > Best Regards, >> > > > > > >> > > > > >> > > > > > >> > > > > Qingsheng Ren >> > > > > > >> > > > > >> > > > > > >> > > > > Real-time Computing Team >> > > > > > >> > > > > Alibaba Cloud >> > > > > > >> > > > > >> > > > > > >> > > > > Email: renqs...@gmail.com >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > Best regards, >> > > > > > > Roman Boyko >> > > > > > > e.: ro.v.bo...@gmail.com >> > > > > > > >> > > > > >> > > >> > > >> >> > >-- >Best Regards, > >*Qingsheng Ren* > >Real-time Computing Team >Alibaba Cloud > >Email: renqs...@gmail.com