Thanks Qingsheng and Alexander for the update. Current API and Options design of this FLIP look good enough from my side,. If no more concerns about the thread, I think we can start a VOTE thread later.
Best, Leonard > 2022年5月18日 下午5:04,Qingsheng Ren <renqs...@gmail.com> 写道: > > Hi Jark and Alexander, > > Thanks for your comments! I’m also OK to introduce common table options. I > prefer to introduce a new DefaultLookupCacheOptions class for holding these > option definitions because putting all options into FactoryUtil would make > it a bit ”crowded” and not well categorized. > > FLIP has been updated according to suggestions above: > 1. Use static “of” method for constructing RescanRuntimeProvider > considering both arguments are required. > 2. Introduce new table options matching DefaultLookupCacheFactory > > Best, > Qingsheng > > On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote: > >> Hi Alex, >> >> 1) retry logic >> I think we can extract some common retry logic into utilities, e.g. >> RetryUtils#tryTimes(times, call). >> This seems independent of this FLIP and can be reused by DataStream users. >> Maybe we can open an issue to discuss this and where to put it. >> >> 2) cache ConfigOptions >> I'm fine with defining cache config options in the framework. >> A candidate place to put is FactoryUtil which also includes >> "sink.parallelism", "format" options. >> >> Best, >> Jark >> >> >> On Wed, 18 May 2022 at 13:52, Александр Смирнов <smirale...@gmail.com> >> wrote: >> >>> Hi Qingsheng, >>> >>> Thank you for considering my comments. >>> >>>> there might be custom logic before making retry, such as re-establish >>> the connection >>> >>> Yes, I understand that. I meant that such logic can be placed in a >>> separate function, that can be implemented by connectors. Just moving >>> the retry logic would make connector's LookupFunction more concise + >>> avoid duplicate code. However, it's a minor change. The decision is up >>> to you. >>> >>>> We decide not to provide common DDL options and let developers to >>> define their own options as we do now per connector. >>> >>> What is the reason for that? One of the main goals of this FLIP was to >>> unify the configs, wasn't it? I understand that current cache design >>> doesn't depend on ConfigOptions, like was before. But still we can put >>> these options into the framework, so connectors can reuse them and >>> avoid code duplication, and, what is more significant, avoid possible >>> different options naming. This moment can be pointed out in >>> documentation for connector developers. >>> >>> Best regards, >>> Alexander >>> >>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>: >>>> >>>> Hi Alexander, >>>> >>>> Thanks for the review and glad to see we are on the same page! I think >>> you forgot to cc the dev mailing list so I’m also quoting your reply under >>> this email. >>>> >>>>> We can add 'maxRetryTimes' option into this class >>>> >>>> In my opinion the retry logic should be implemented in lookup() instead >>> of in LookupFunction#eval(). Retrying is only meaningful under some >>> specific retriable failures, and there might be custom logic before making >>> retry, such as re-establish the connection (JdbcRowDataLookupFunction is an >>> example), so it's more handy to leave it to the connector. >>>> >>>>> I don't see DDL options, that were in previous version of FLIP. Do >>> you have any special plans for them? >>>> >>>> We decide not to provide common DDL options and let developers to >>> define their own options as we do now per connector. >>>> >>>> The rest of comments sound great and I’ll update the FLIP. Hope we can >>> finalize our proposal soon! >>>> >>>> Best, >>>> >>>> Qingsheng >>>> >>>> >>>>> On May 17, 2022, at 13:46, Александр Смирнов <smirale...@gmail.com> >>> wrote: >>>>> >>>>> Hi Qingsheng and devs! >>>>> >>>>> I like the overall design of updated FLIP, however I have several >>>>> suggestions and questions. >>>>> >>>>> 1) Introducing LookupFunction as a subclass of TableFunction is a good >>>>> idea. We can add 'maxRetryTimes' option into this class. 'eval' method >>>>> of new LookupFunction is great for this purpose. The same is for >>>>> 'async' case. >>>>> >>>>> 2) There might be other configs in future, such as 'cacheMissingKey' >>>>> in LookupFunctionProvider or 'rescanInterval' in ScanRuntimeProvider. >>>>> Maybe use Builder pattern in LookupFunctionProvider and >>>>> RescanRuntimeProvider for more flexibility (use one 'build' method >>>>> instead of many 'of' methods in future)? >>>>> >>>>> 3) What are the plans for existing TableFunctionProvider and >>>>> AsyncTableFunctionProvider? I think they should be deprecated. >>>>> >>>>> 4) Am I right that the current design does not assume usage of >>>>> user-provided LookupCache in re-scanning? In this case, it is not very >>>>> clear why do we need methods such as 'invalidate' or 'putAll' in >>>>> LookupCache. >>>>> >>>>> 5) I don't see DDL options, that were in previous version of FLIP. Do >>>>> you have any special plans for them? >>>>> >>>>> If you don't mind, I would be glad to be able to make small >>>>> adjustments to the FLIP document too. I think it's worth mentioning >>>>> about what exactly optimizations are planning in the future. >>>>> >>>>> Best regards, >>>>> Smirnov Alexander >>>>> >>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com>: >>>>>> >>>>>> Hi Alexander and devs, >>>>>> >>>>>> Thank you very much for the in-depth discussion! As Jark mentioned >>> we were inspired by Alexander's idea and made a refactor on our design. >>> FLIP-221 [1] has been updated to reflect our design now and we are happy to >>> hear more suggestions from you! >>>>>> >>>>>> Compared to the previous design: >>>>>> 1. The lookup cache serves at table runtime level and is integrated >>> as a component of LookupJoinRunner as discussed previously. >>>>>> 2. Interfaces are renamed and re-designed to reflect the new design. >>>>>> 3. We separate the all-caching case individually and introduce a new >>> RescanRuntimeProvider to reuse the ability of scanning. We are planning to >>> support SourceFunction / InputFormat for now considering the complexity of >>> FLIP-27 Source API. >>>>>> 4. A new interface LookupFunction is introduced to make the semantic >>> of lookup more straightforward for developers. >>>>>> >>>>>> For replying to Alexander: >>>>>>> However I'm a little confused whether InputFormat is deprecated or >>> not. Am I right that it will be so in the future, but currently it's not? >>>>>> Yes you are right. InputFormat is not deprecated for now. I think it >>> will be deprecated in the future but we don't have a clear plan for that. >>>>>> >>>>>> Thanks again for the discussion on this FLIP and looking forward to >>> cooperating with you after we finalize the design and interfaces! >>>>>> >>>>>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >>>>>> >>>>>> Best regards, >>>>>> >>>>>> Qingsheng >>>>>> >>>>>> >>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов < >>> smirale...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Jark, Qingsheng and Leonard! >>>>>>> >>>>>>> Glad to see that we came to a consensus on almost all points! >>>>>>> >>>>>>> However I'm a little confused whether InputFormat is deprecated or >>>>>>> not. Am I right that it will be so in the future, but currently it's >>>>>>> not? Actually I also think that for the first version it's OK to use >>>>>>> InputFormat in ALL cache realization, because supporting rescan >>>>>>> ability seems like a very distant prospect. But for this decision we >>>>>>> need a consensus among all discussion participants. >>>>>>> >>>>>>> In general, I don't have something to argue with your statements. >>> All >>>>>>> of them correspond my ideas. Looking ahead, it would be nice to work >>>>>>> on this FLIP cooperatively. I've already done a lot of work on >>> lookup >>>>>>> join caching with realization very close to the one we are >>> discussing, >>>>>>> and want to share the results of this work. Anyway looking forward >>> for >>>>>>> the FLIP update! >>>>>>> >>>>>>> Best regards, >>>>>>> Smirnov Alexander >>>>>>> >>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>: >>>>>>>> >>>>>>>> Hi Alex, >>>>>>>> >>>>>>>> Thanks for summarizing your points. >>>>>>>> >>>>>>>> In the past week, Qingsheng, Leonard, and I have discussed it >>> several times >>>>>>>> and we have totally refactored the design. >>>>>>>> I'm glad to say we have reached a consensus on many of your points! >>>>>>>> Qingsheng is still working on updating the design docs and maybe >>> can be >>>>>>>> available in the next few days. >>>>>>>> I will share some conclusions from our discussions: >>>>>>>> >>>>>>>> 1) we have refactored the design towards to "cache in framework" >>> way. >>>>>>>> >>>>>>>> 2) a "LookupCache" interface for users to customize and a default >>>>>>>> implementation with builder for users to easy-use. >>>>>>>> This can both make it possible to both have flexibility and >>> conciseness. >>>>>>>> >>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup cache, esp >>> reducing >>>>>>>> IO. >>>>>>>> Filter pushdown should be the final state and the unified way to >>> both >>>>>>>> support pruning ALL cache and LRU cache, >>>>>>>> so I think we should make effort in this direction. If we need to >>> support >>>>>>>> filter pushdown for ALL cache anyway, why not use >>>>>>>> it for LRU cache as well? Either way, as we decide to implement >>> the cache >>>>>>>> in the framework, we have the chance to support >>>>>>>> filter on cache anytime. This is an optimization and it doesn't >>> affect the >>>>>>>> public API. I think we can create a JIRA issue to >>>>>>>> discuss it when the FLIP is accepted. >>>>>>>> >>>>>>>> 4) The idea to support ALL cache is similar to your proposal. >>>>>>>> In the first version, we will only support InputFormat, >>> SourceFunction for >>>>>>>> cache all (invoke InputFormat in join operator). >>>>>>>> For FLIP-27 source, we need to join a true source operator instead >>> of >>>>>>>> calling it embedded in the join operator. >>>>>>>> However, this needs another FLIP to support the re-scan ability >>> for FLIP-27 >>>>>>>> Source, and this can be a large work. >>>>>>>> In order to not block this issue, we can put the effort of FLIP-27 >>> source >>>>>>>> integration into future work and integrate >>>>>>>> InputFormat&SourceFunction for now. >>>>>>>> >>>>>>>> I think it's fine to use InputFormat&SourceFunction, as they are >>> not >>>>>>>> deprecated, otherwise, we have to introduce another function >>>>>>>> similar to them which is meaningless. We need to plan FLIP-27 >>> source >>>>>>>> integration ASAP before InputFormat & SourceFunction are >>> deprecated. >>>>>>>> >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов < >>> smirale...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Martijn! >>>>>>>>> >>>>>>>>> Got it. Therefore, the realization with InputFormat is not >>> considered. >>>>>>>>> Thanks for clearing that up! >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> Smirnov Alexander >>>>>>>>> >>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com >>>> : >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> With regards to: >>>>>>>>>> >>>>>>>>>>> But if there are plans to refactor all connectors to FLIP-27 >>>>>>>>>> >>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old >>> interfaces will be >>>>>>>>>> deprecated and connectors will either be refactored to use the >>> new ones >>>>>>>>> or >>>>>>>>>> dropped. >>>>>>>>>> >>>>>>>>>> The caching should work for connectors that are using FLIP-27 >>> interfaces, >>>>>>>>>> we should not introduce new features for old interfaces. >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> >>>>>>>>>> Martijn >>>>>>>>>> >>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов < >>> smirale...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jark! >>>>>>>>>>> >>>>>>>>>>> Sorry for the late response. I would like to make some comments >>> and >>>>>>>>>>> clarify my points. >>>>>>>>>>> >>>>>>>>>>> 1) I agree with your first statement. I think we can achieve >>> both >>>>>>>>>>> advantages this way: put the Cache interface in >>> flink-table-common, >>>>>>>>>>> but have implementations of it in flink-table-runtime. >>> Therefore if a >>>>>>>>>>> connector developer wants to use existing cache strategies and >>> their >>>>>>>>>>> implementations, he can just pass lookupConfig to the planner, >>> but if >>>>>>>>>>> he wants to have its own cache implementation in his >>> TableFunction, it >>>>>>>>>>> will be possible for him to use the existing interface for this >>>>>>>>>>> purpose (we can explicitly point this out in the >>> documentation). In >>>>>>>>>>> this way all configs and metrics will be unified. WDYT? >>>>>>>>>>> >>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will have >>> 90% of >>>>>>>>>>> lookup requests that can never be cached >>>>>>>>>>> >>>>>>>>>>> 2) Let me clarify the logic filters optimization in case of LRU >>> cache. >>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here we >>> always >>>>>>>>>>> store the response of the dimension table in cache, even after >>>>>>>>>>> applying calc function. I.e. if there are no rows after applying >>>>>>>>>>> filters to the result of the 'eval' method of TableFunction, we >>> store >>>>>>>>>>> the empty list by lookup keys. Therefore the cache line will be >>>>>>>>>>> filled, but will require much less memory (in bytes). I.e. we >>> don't >>>>>>>>>>> completely filter keys, by which result was pruned, but >>> significantly >>>>>>>>>>> reduce required memory to store this result. If the user knows >>> about >>>>>>>>>>> this behavior, he can increase the 'max-rows' option before the >>> start >>>>>>>>>>> of the job. But actually I came up with the idea that we can do >>> this >>>>>>>>>>> automatically by using the 'maximumWeight' and 'weigher' >>> methods of >>>>>>>>>>> GuavaCache [1]. Weight can be the size of the collection of rows >>>>>>>>>>> (value of cache). Therefore cache can automatically fit much >>> more >>>>>>>>>>> records than before. >>>>>>>>>>> >>>>>>>>>>>> Flink SQL has provided a standard way to do filters and >>> projects >>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and >>> SupportsProjectionPushDown. >>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean >>> it's >>>>>>>>> hard >>>>>>>>>>> to implement. >>>>>>>>>>> >>>>>>>>>>> It's debatable how difficult it will be to implement filter >>> pushdown. >>>>>>>>>>> But I think the fact that currently there is no database >>> connector >>>>>>>>>>> with filter pushdown at least means that this feature won't be >>>>>>>>>>> supported soon in connectors. Moreover, if we talk about other >>>>>>>>>>> connectors (not in Flink repo), their databases might not >>> support all >>>>>>>>>>> Flink filters (or not support filters at all). I think users are >>>>>>>>>>> interested in supporting cache filters optimization >>> independently of >>>>>>>>>>> supporting other features and solving more complex problems (or >>>>>>>>>>> unsolvable at all). >>>>>>>>>>> >>>>>>>>>>> 3) I agree with your third statement. Actually in our internal >>> version >>>>>>>>>>> I also tried to unify the logic of scanning and reloading data >>> from >>>>>>>>>>> connectors. But unfortunately, I didn't find a way to unify the >>> logic >>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, SourceFunction, >>> Source,...) >>>>>>>>>>> and reuse it in reloading ALL cache. As a result I settled on >>> using >>>>>>>>>>> InputFormat, because it was used for scanning in all lookup >>>>>>>>>>> connectors. (I didn't know that there are plans to deprecate >>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27 >>> source >>>>>>>>>>> in ALL caching is not good idea, because this source was >>> designed to >>>>>>>>>>> work in distributed environment (SplitEnumerator on JobManager >>> and >>>>>>>>>>> SourceReaders on TaskManagers), not in one operator (lookup join >>>>>>>>>>> operator in our case). There is even no direct way to pass >>> splits from >>>>>>>>>>> SplitEnumerator to SourceReader (this logic works through >>>>>>>>>>> SplitEnumeratorContext, which requires >>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send AddSplitEvents). >>> Usage of >>>>>>>>>>> InputFormat for ALL cache seems much more clearer and easier. >>> But if >>>>>>>>>>> there are plans to refactor all connectors to FLIP-27, I have >>> the >>>>>>>>>>> following ideas: maybe we can refuse from lookup join ALL cache >>> in >>>>>>>>>>> favor of simple join with multiple scanning of batch source? >>> The point >>>>>>>>>>> is that the only difference between lookup join ALL cache and >>> simple >>>>>>>>>>> join with batch source is that in the first case scanning is >>> performed >>>>>>>>>>> multiple times, in between which state (cache) is cleared >>> (correct me >>>>>>>>>>> if I'm wrong). So what if we extend the functionality of simple >>> join >>>>>>>>>>> to support state reloading + extend the functionality of >>> scanning >>>>>>>>>>> batch source multiple times (this one should be easy with new >>> FLIP-27 >>>>>>>>>>> source, that unifies streaming/batch reading - we will need to >>> change >>>>>>>>>>> only SplitEnumerator, which will pass splits again after some >>> TTL). >>>>>>>>>>> WDYT? I must say that this looks like a long-term goal and will >>> make >>>>>>>>>>> the scope of this FLIP even larger than you said. Maybe we can >>> limit >>>>>>>>>>> ourselves to a simpler solution now (InputFormats). >>>>>>>>>>> >>>>>>>>>>> So to sum up, my points is like this: >>>>>>>>>>> 1) There is a way to make both concise and flexible interfaces >>> for >>>>>>>>>>> caching in lookup join. >>>>>>>>>>> 2) Cache filters optimization is important both in LRU and ALL >>> caches. >>>>>>>>>>> 3) It is unclear when filter pushdown will be supported in Flink >>>>>>>>>>> connectors, some of the connectors might not have the >>> opportunity to >>>>>>>>>>> support filter pushdown + as I know, currently filter pushdown >>> works >>>>>>>>>>> only for scanning (not lookup). So cache filters + projections >>>>>>>>>>> optimization should be independent from other features. >>>>>>>>>>> 4) ALL cache realization is a complex topic that involves >>> multiple >>>>>>>>>>> aspects of how Flink is developing. Refusing from InputFormat >>> in favor >>>>>>>>>>> of FLIP-27 Source will make ALL cache realization really >>> complex and >>>>>>>>>>> not clear, so maybe instead of that we can extend the >>> functionality of >>>>>>>>>>> simple join or not refuse from InputFormat in case of lookup >>> join ALL >>>>>>>>>>> cache? >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> Smirnov Alexander >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> >>>>>>>>> >>> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) >>>>>>>>>>> >>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>: >>>>>>>>>>>> >>>>>>>>>>>> It's great to see the active discussion! I want to share my >>> ideas: >>>>>>>>>>>> >>>>>>>>>>>> 1) implement the cache in framework vs. connectors base >>>>>>>>>>>> I don't have a strong opinion on this. Both ways should work >>> (e.g., >>>>>>>>> cache >>>>>>>>>>>> pruning, compatibility). >>>>>>>>>>>> The framework way can provide more concise interfaces. >>>>>>>>>>>> The connector base way can define more flexible cache >>>>>>>>>>>> strategies/implementations. >>>>>>>>>>>> We are still investigating a way to see if we can have both >>>>>>>>> advantages. >>>>>>>>>>>> We should reach a consensus that the way should be a final >>> state, >>>>>>>>> and we >>>>>>>>>>>> are on the path to it. >>>>>>>>>>>> >>>>>>>>>>>> 2) filters and projections pushdown: >>>>>>>>>>>> I agree with Alex that the filter pushdown into cache can >>> benefit a >>>>>>>>> lot >>>>>>>>>>> for >>>>>>>>>>>> ALL cache. >>>>>>>>>>>> However, this is not true for LRU cache. Connectors use cache >>> to >>>>>>>>> reduce >>>>>>>>>>> IO >>>>>>>>>>>> requests to databases for better throughput. >>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will have >>> 90% of >>>>>>>>>>> lookup >>>>>>>>>>>> requests that can never be cached >>>>>>>>>>>> and hit directly to the databases. That means the cache is >>>>>>>>> meaningless in >>>>>>>>>>>> this case. >>>>>>>>>>>> >>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do filters and >>> projects >>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and >>>>>>>>> SupportsProjectionPushDown. >>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, don't mean >>> it's >>>>>>>>> hard >>>>>>>>>>> to >>>>>>>>>>>> implement. >>>>>>>>>>>> They should implement the pushdown interfaces to reduce IO and >>> the >>>>>>>>> cache >>>>>>>>>>>> size. >>>>>>>>>>>> That should be a final state that the scan source and lookup >>> source >>>>>>>>> share >>>>>>>>>>>> the exact pushdown implementation. >>>>>>>>>>>> I don't see why we need to duplicate the pushdown logic in >>> caches, >>>>>>>>> which >>>>>>>>>>>> will complex the lookup join design. >>>>>>>>>>>> >>>>>>>>>>>> 3) ALL cache abstraction >>>>>>>>>>>> All cache might be the most challenging part of this FLIP. We >>> have >>>>>>>>> never >>>>>>>>>>>> provided a reload-lookup public interface. >>>>>>>>>>>> Currently, we put the reload logic in the "eval" method of >>>>>>>>> TableFunction. >>>>>>>>>>>> That's hard for some sources (e.g., Hive). >>>>>>>>>>>> Ideally, connector implementation should share the logic of >>> reload >>>>>>>>> and >>>>>>>>>>>> scan, i.e. ScanTableSource with >>> InputFormat/SourceFunction/FLIP-27 >>>>>>>>>>> Source. >>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated, and the >>> FLIP-27 >>>>>>>>>>> source >>>>>>>>>>>> is deeply coupled with SourceOperator. >>>>>>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin, this >>> may make >>>>>>>>> the >>>>>>>>>>>> scope of this FLIP much larger. >>>>>>>>>>>> We are still investigating how to abstract the ALL cache logic >>> and >>>>>>>>> reuse >>>>>>>>>>>> the existing source interfaces. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Jark >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com >>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> It's a much more complicated activity and lies out of the >>> scope of >>>>>>>>> this >>>>>>>>>>>>> improvement. Because such pushdowns should be done for all >>>>>>>>>>> ScanTableSource >>>>>>>>>>>>> implementations (not only for Lookup ones). >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser < >>>>>>>>> martijnvis...@apache.org> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>> >>>>>>>>>>>>>> One question regarding "And Alexander correctly mentioned >>> that >>>>>>>>> filter >>>>>>>>>>>>>> pushdown still is not implemented for jdbc/hive/hbase." -> >>> Would >>>>>>>>> an >>>>>>>>>>>>>> alternative solution be to actually implement these filter >>>>>>>>> pushdowns? >>>>>>>>>>> I >>>>>>>>>>>>>> can >>>>>>>>>>>>>> imagine that there are many more benefits to doing that, >>> outside >>>>>>>>> of >>>>>>>>>>> lookup >>>>>>>>>>>>>> caching and metrics. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Martijn Visser >>>>>>>>>>>>>> https://twitter.com/MartijnVisser82 >>>>>>>>>>>>>> https://github.com/MartijnVisser >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < >>> ro.v.bo...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi everyone! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for driving such a valuable improvement! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I do think that single cache implementation would be a nice >>>>>>>>>>> opportunity >>>>>>>>>>>>>> for >>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF >>> proc_time" >>>>>>>>>>> semantics >>>>>>>>>>>>>>> anyway - doesn't matter how it will be implemented. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say that: >>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut off the >>> cache >>>>>>>>> size >>>>>>>>>>> by >>>>>>>>>>>>>>> simply filtering unnecessary data. And the most handy way >>> to do >>>>>>>>> it >>>>>>>>>>> is >>>>>>>>>>>>>> apply >>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder to pass it >>>>>>>>>>> through the >>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander correctly >>>>>>>>> mentioned >>>>>>>>>>> that >>>>>>>>>>>>>>> filter pushdown still is not implemented for >>> jdbc/hive/hbase. >>>>>>>>>>>>>>> 2) The ability to set the different caching parameters for >>>>>>>>> different >>>>>>>>>>>>>> tables >>>>>>>>>>>>>>> is quite important. So I would prefer to set it through DDL >>>>>>>>> rather >>>>>>>>>>> than >>>>>>>>>>>>>>> have similar ttla, strategy and other options for all lookup >>>>>>>>> tables. >>>>>>>>>>>>>>> 3) Providing the cache into the framework really deprives >>> us of >>>>>>>>>>>>>>> extensibility (users won't be able to implement their own >>>>>>>>> cache). >>>>>>>>>>> But >>>>>>>>>>>>>> most >>>>>>>>>>>>>>> probably it might be solved by creating more different cache >>>>>>>>>>> strategies >>>>>>>>>>>>>> and >>>>>>>>>>>>>>> a wider set of configurations. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> All these points are much closer to the schema proposed by >>>>>>>>>>> Alexander. >>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and all >>> these >>>>>>>>>>>>>> facilities >>>>>>>>>>>>>>> might be simply implemented in your architecture? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>> Roman Boyko >>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser < >>>>>>>>>>> martijnvis...@apache.org> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to express >>> that >>>>>>>>> I >>>>>>>>>>> really >>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic and I hope >>>>>>>>> that >>>>>>>>>>>>>> others >>>>>>>>>>>>>>>> will join the conversation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Martijn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов < >>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I have >>> questions >>>>>>>>>>> about >>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get something?). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR SYSTEM_TIME >>>>>>>>> AS OF >>>>>>>>>>>>>>>> proc_time” >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS OF >>>>>>>>> proc_time" >>>>>>>>>>> is >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>> fully implemented with caching, but as you said, users go >>>>>>>>> on it >>>>>>>>>>>>>>>>> consciously to achieve better performance (no one proposed >>>>>>>>> to >>>>>>>>>>> enable >>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you mean other >>>>>>>>>>> developers >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> connectors? In this case developers explicitly specify >>>>>>>>> whether >>>>>>>>>>> their >>>>>>>>>>>>>>>>> connector supports caching or not (in the list of >>> supported >>>>>>>>>>>>>> options), >>>>>>>>>>>>>>>>> no one makes them do that if they don't want to. So what >>>>>>>>>>> exactly is >>>>>>>>>>>>>>>>> the difference between implementing caching in modules >>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common from the >>>>>>>>>>> considered >>>>>>>>>>>>>>>>> point of view? How does it affect on breaking/non-breaking >>>>>>>>> the >>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> confront a situation that allows table options in DDL to >>>>>>>>>>> control >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> behavior of the framework, which has never happened >>>>>>>>> previously >>>>>>>>>>> and >>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>> be cautious >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If we talk about main differences of semantics of DDL >>>>>>>>> options >>>>>>>>>>> and >>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about limiting >>>>>>>>> the >>>>>>>>>>> scope >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>> the options + importance for the user business logic >>> rather >>>>>>>>> than >>>>>>>>>>>>>>>>> specific location of corresponding logic in the >>> framework? I >>>>>>>>>>> mean >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> in my design, for example, putting an option with lookup >>>>>>>>> cache >>>>>>>>>>>>>>>>> strategy in configurations would be the wrong decision, >>>>>>>>>>> because it >>>>>>>>>>>>>>>>> directly affects the user's business logic (not just >>>>>>>>> performance >>>>>>>>>>>>>>>>> optimization) + touches just several functions of ONE >>> table >>>>>>>>>>> (there >>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>> be multiple tables with different caches). Does it really >>>>>>>>>>> matter for >>>>>>>>>>>>>>>>> the user (or someone else) where the logic is located, >>>>>>>>> which is >>>>>>>>>>>>>>>>> affected by the applied option? >>>>>>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism', which >>> in >>>>>>>>>>> some way >>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I don't see >>> any >>>>>>>>>>> problem >>>>>>>>>>>>>>>>> here. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching scenario >>>>>>>>> and >>>>>>>>>>> the >>>>>>>>>>>>>>> design >>>>>>>>>>>>>>>>> would become more complex >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but actually >>>>>>>>> in our >>>>>>>>>>>>>>>>> internal version we solved this problem quite easily - we >>>>>>>>> reused >>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new API). The >>>>>>>>>>> point is >>>>>>>>>>>>>>>>> that currently all lookup connectors use InputFormat for >>>>>>>>>>> scanning >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it uses >>>>>>>>> class >>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper around >>>>>>>>>>> InputFormat. >>>>>>>>>>>>>>>>> The advantage of this solution is the ability to reload >>>>>>>>> cache >>>>>>>>>>> data >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> parallel (number of threads depends on number of >>>>>>>>> InputSplits, >>>>>>>>>>> but >>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time >>> significantly >>>>>>>>>>> reduces >>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I know that >>>>>>>>> usually >>>>>>>>>>> we >>>>>>>>>>>>>> try >>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but maybe >>> this >>>>>>>>> one >>>>>>>>>>> can >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal solution, >>>>>>>>> maybe >>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>> are better ones. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Providing the cache in the framework might introduce >>>>>>>>>>> compatibility >>>>>>>>>>>>>>>> issues >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It's possible only in cases when the developer of the >>>>>>>>> connector >>>>>>>>>>>>>> won't >>>>>>>>>>>>>>>>> properly refactor his code and will use new cache options >>>>>>>>>>>>>> incorrectly >>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2 different >>>>>>>>> code >>>>>>>>>>>>>>>>> places). For correct behavior all he will need to do is to >>>>>>>>>>> redirect >>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig (+ maybe >>>>>>>>> add an >>>>>>>>>>>>>> alias >>>>>>>>>>>>>>>>> for options, if there was different naming), everything >>>>>>>>> will be >>>>>>>>>>>>>>>>> transparent for users. If the developer won't do >>>>>>>>> refactoring at >>>>>>>>>>> all, >>>>>>>>>>>>>>>>> nothing will be changed for the connector because of >>>>>>>>> backward >>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use his own >>>>>>>>> cache >>>>>>>>>>> logic, >>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs into the >>>>>>>>>>> framework, >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> instead make his own implementation with already existing >>>>>>>>>>> configs >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare case). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> filters and projections should be pushed all the way down >>>>>>>>> to >>>>>>>>>>> the >>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>> function, like what we do in the scan source >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that the ONLY >>>>>>>>> connector >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource >>>>>>>>>>>>>>>>> (no database connector supports it currently). Also for >>> some >>>>>>>>>>>>>> databases >>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex filters >>>>>>>>> that we >>>>>>>>>>> have >>>>>>>>>>>>>>>>> in Flink. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> only applying these optimizations to the cache seems not >>>>>>>>>>> quite >>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of data >>>>>>>>> from the >>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose in >>> dimension >>>>>>>>>>> table >>>>>>>>>>>>>>>>> 'users' >>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40, and input >>>>>>>>> stream >>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of users. >>> If >>>>>>>>> we >>>>>>>>>>> have >>>>>>>>>>>>>>>>> filter 'age > 30', >>>>>>>>>>>>>>>>> there will be twice less data in cache. This means the >>> user >>>>>>>>> can >>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 times. It >>> will >>>>>>>>>>> gain a >>>>>>>>>>>>>>>>> huge >>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization starts to >>>>>>>>> really >>>>>>>>>>>>>> shine >>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and >>> projections >>>>>>>>>>> can't >>>>>>>>>>>>>> fit >>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up additional >>>>>>>>>>>>>> possibilities >>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite useful'. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It would be great to hear other voices regarding this >>> topic! >>>>>>>>>>> Because >>>>>>>>>>>>>>>>> we have quite a lot of controversial points, and I think >>>>>>>>> with >>>>>>>>>>> the >>>>>>>>>>>>>> help >>>>>>>>>>>>>>>>> of others it will be easier for us to come to a consensus. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>> Smirnov Alexander >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren < >>>>>>>>> renqs...@gmail.com >>>>>>>>>>>> : >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Alexander and Arvid, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late response! >>>>>>>>> We >>>>>>>>>>> had >>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>> internal discussion together with Jark and Leonard and I’d >>>>>>>>> like >>>>>>>>>>> to >>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the cache >>>>>>>>> logic in >>>>>>>>>>> the >>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>> runtime layer or wrapping around the user-provided table >>>>>>>>>>> function, >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending TableFunction >>>>>>>>> with >>>>>>>>>>> these >>>>>>>>>>>>>>>>> concerns: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR >>>>>>>>> SYSTEM_TIME >>>>>>>>>>> AS OF >>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the content >>>>>>>>> of the >>>>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>> table at the moment of querying. If users choose to enable >>>>>>>>>>> caching >>>>>>>>>>>>>> on >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this breakage >>> is >>>>>>>>>>>>>> acceptable >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not to provide >>>>>>>>>>> caching on >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> table runtime level. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the framework >>>>>>>>>>> (whether >>>>>>>>>>>>>> in a >>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we have to >>>>>>>>> confront a >>>>>>>>>>>>>>>> situation >>>>>>>>>>>>>>>>> that allows table options in DDL to control the behavior >>> of >>>>>>>>> the >>>>>>>>>>>>>>>> framework, >>>>>>>>>>>>>>>>> which has never happened previously and should be >>> cautious. >>>>>>>>>>> Under >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> current design the behavior of the framework should only >>> be >>>>>>>>>>>>>> specified >>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard to apply >>>>>>>>> these >>>>>>>>>>>>>> general >>>>>>>>>>>>>>>>> configs to a specific table. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads and refresh >>>>>>>>> all >>>>>>>>>>>>>> records >>>>>>>>>>>>>>>>> periodically into the memory to achieve high lookup >>>>>>>>> performance >>>>>>>>>>>>>> (like >>>>>>>>>>>>>>>> Hive >>>>>>>>>>>>>>>>> connector in the community, and also widely used by our >>>>>>>>> internal >>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the user’s >>>>>>>>> TableFunction >>>>>>>>>>>>>> works >>>>>>>>>>>>>>>> fine >>>>>>>>>>>>>>>>> for LRU caches, but I think we have to introduce a new >>>>>>>>>>> interface for >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> all-caching scenario and the design would become more >>>>>>>>> complex. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might introduce >>>>>>>>>>>>>> compatibility >>>>>>>>>>>>>>>>> issues to existing lookup sources like there might exist >>> two >>>>>>>>>>> caches >>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>> totally different strategies if the user incorrectly >>>>>>>>> configures >>>>>>>>>>> the >>>>>>>>>>>>>>> table >>>>>>>>>>>>>>>>> (one in the framework and another implemented by the >>> lookup >>>>>>>>>>> source). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I think >>>>>>>>>>> filters >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> projections should be pushed all the way down to the table >>>>>>>>>>> function, >>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>> what we do in the scan source, instead of the runner with >>>>>>>>> the >>>>>>>>>>> cache. >>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O and >>>>>>>>> pressure >>>>>>>>>>> on the >>>>>>>>>>>>>>>>> external system, and only applying these optimizations to >>>>>>>>> the >>>>>>>>>>> cache >>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>> not quite useful. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our ideas. >>>>>>>>> We >>>>>>>>>>>>>> prefer to >>>>>>>>>>>>>>>>> keep the cache implementation as a part of TableFunction, >>>>>>>>> and we >>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>> provide some helper classes (CachingTableFunction, >>>>>>>>>>>>>>>> AllCachingTableFunction, >>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and regulate >>>>>>>>> metrics >>>>>>>>>>> of the >>>>>>>>>>>>>>>> cache. >>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Looking forward to your ideas! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >>>>>>>>>>>>>>>>>> [2] https://github.com/PatrickRen/flink/tree/FLIP-221 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Qingsheng >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов < >>>>>>>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I have few comments on your message. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> but could also live with an easier solution as the >>>>>>>>> first >>>>>>>>>>> step: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive >>>>>>>>> (originally >>>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually they follow >>>>>>>>> the >>>>>>>>>>> same >>>>>>>>>>>>>>>>>>> goal, but implementation details are different. If we >>>>>>>>> will >>>>>>>>>>> go one >>>>>>>>>>>>>>> way, >>>>>>>>>>>>>>>>>>> moving to another way in the future will mean deleting >>>>>>>>>>> existing >>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>> and once again changing the API for connectors. So I >>>>>>>>> think we >>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>> reach a consensus with the community about that and then >>>>>>>>> work >>>>>>>>>>>>>>> together >>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for >>> different >>>>>>>>>>> parts >>>>>>>>>>>>>> of >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification / introducing >>>>>>>>>>> proposed >>>>>>>>>>>>>> set >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> as the source will only receive the requests after >>>>>>>>> filter >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of the lookup >>>>>>>>>>> table, we >>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that we can >>>>>>>>> filter >>>>>>>>>>>>>>> responses, >>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter pushdown. So >>>>>>>>> if >>>>>>>>>>>>>>> filtering >>>>>>>>>>>>>>>>>>> is done before caching, there will be much less rows in >>>>>>>>>>> cache. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not >>>>>>>>> shared. >>>>>>>>>>> I >>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>> know the >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> solution to share images to be honest. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of >>>>>>>>> conversations >>>>>>>>>>> :) >>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I made a >>>>>>>>> Jira >>>>>>>>>>> issue, >>>>>>>>>>>>>>>>>>> where described the proposed changes in more details - >>>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Will happy to get more feedback! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>> Smirnov Alexander >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise < >>>>>>>>> ar...@apache.org>: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Qingsheng, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was not >>>>>>>>>>> satisfying >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>> me. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could also live >>>>>>>>> with >>>>>>>>>>> an >>>>>>>>>>>>>>> easier >>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making caching >>>>>>>>> an >>>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a caching >>>>>>>>> layer >>>>>>>>>>>>>> around X. >>>>>>>>>>>>>>>> So >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that >>>>>>>>> delegates to >>>>>>>>>>> X in >>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it into the >>>>>>>>>>> operator >>>>>>>>>>>>>>>> model >>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably >>>>>>>>> unnecessary >>>>>>>>>>> in >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> first step >>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only receive >>>>>>>>> the >>>>>>>>>>>>>> requests >>>>>>>>>>>>>>>>> after >>>>>>>>>>>>>>>>>>>> filter; applying projection may be more interesting to >>>>>>>>> save >>>>>>>>>>>>>>> memory). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of this FLIP >>>>>>>>>>> would be >>>>>>>>>>>>>>>>> limited to >>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces. Everything >>>>>>>>> else >>>>>>>>>>>>>>> remains >>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means we can >>>>>>>>> easily >>>>>>>>>>>>>>>> incorporate >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed out >>>>>>>>> later. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is not >>>>>>>>> shared. >>>>>>>>>>> I >>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>> know the >>>>>>>>>>>>>>>>>>>> solution to share images to be honest. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов < >>>>>>>>>>>>>>>>> smirale...@gmail.com> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a >>>>>>>>> committer >>>>>>>>>>> yet, >>>>>>>>>>>>>> but >>>>>>>>>>>>>>>> I'd >>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP really >>>>>>>>>>> interested >>>>>>>>>>>>>> me. >>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in my >>>>>>>>>>> company’s >>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts on >>>>>>>>> this and >>>>>>>>>>>>>> make >>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>> open source. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than >>>>>>>>> introducing an >>>>>>>>>>>>>>> abstract >>>>>>>>>>>>>>>>>>>>> class for TableFunction (CachingTableFunction). As >>>>>>>>> you >>>>>>>>>>> know, >>>>>>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common >>>>>>>>> module, >>>>>>>>>>> which >>>>>>>>>>>>>>>>> provides >>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s very >>>>>>>>>>> convenient >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> importing >>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction contains >>>>>>>>>>> logic >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> runtime execution, so this class and everything >>>>>>>>>>> connected >>>>>>>>>>>>>> with >>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>> should be located in another module, probably in >>>>>>>>>>>>>>>>> flink-table-runtime. >>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend on another >>>>>>>>>>> module, >>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t sound >>>>>>>>>>> good. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I suggest adding a new method ‘getLookupConfig’ to >>>>>>>>>>>>>>>> LookupTableSource >>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors to only >>>>>>>>> pass >>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore they won’t >>>>>>>>>>> depend on >>>>>>>>>>>>>>>>> runtime >>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner will >>>>>>>>>>> construct a >>>>>>>>>>>>>>>> lookup >>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime logic >>>>>>>>>>>>>> (ProcessFunctions >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture looks like >>>>>>>>> in >>>>>>>>>>> the >>>>>>>>>>>>>>> pinned >>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually yours >>>>>>>>>>>>>> CacheConfig). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be >>>>>>>>> responsible >>>>>>>>>>> for >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>> – >>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors. >>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in >>>>>>>>> flink-table-runtime >>>>>>>>>>> - >>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner, >>>>>>>>>>>>>>> LookupJoinRunnerWithCalc, >>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I suggest adding classes LookupJoinCachingRunner, >>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> And here comes another more powerful advantage of >>>>>>>>> such a >>>>>>>>>>>>>>> solution. >>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can apply >>>>>>>>> some >>>>>>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc was >>>>>>>>> named >>>>>>>>>>> like >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which actually >>>>>>>>>>> mostly >>>>>>>>>>>>>>>> consists >>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> filters and projections. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup table B >>>>>>>>>>> condition >>>>>>>>>>>>>>> ‘JOIN … >>>>>>>>>>>>>>>>> ON >>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE B.salary > >>>>>>>>> 1000’ >>>>>>>>>>>>>>> ‘calc’ >>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age + 10 and >>>>>>>>>>>>>> B.salary > >>>>>>>>>>>>>>>>> 1000. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> If we apply this function before storing records in >>>>>>>>>>> cache, >>>>>>>>>>>>>> size >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: filters = avoid >>>>>>>>>>> storing >>>>>>>>>>>>>>>> useless >>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce records’ >>>>>>>>> size. So >>>>>>>>>>> the >>>>>>>>>>>>>>>> initial >>>>>>>>>>>>>>>>>>>>> max number of records in cache can be increased by >>>>>>>>> the >>>>>>>>>>> user. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> What do you think about it? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote: >>>>>>>>>>>>>>>>>>>>>> Hi devs, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion about >>>>>>>>>>>>>> FLIP-221[1], >>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table cache and >>>>>>>>> its >>>>>>>>>>>>>> standard >>>>>>>>>>>>>>>>> metrics. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should implement >>>>>>>>>>> their >>>>>>>>>>>>>> own >>>>>>>>>>>>>>>>> cache to >>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a standard of >>>>>>>>>>> metrics >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> users and >>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup joins, >>>>>>>>> which >>>>>>>>>>> is a >>>>>>>>>>>>>>>> quite >>>>>>>>>>>>>>>>> common >>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including cache, >>>>>>>>>>>>>> metrics, >>>>>>>>>>>>>>>>> wrapper >>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table options. >>>>>>>>> Please >>>>>>>>>>> take a >>>>>>>>>>>>>>> look >>>>>>>>>>>>>>>>> at the >>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any suggestions >>>>>>>>> and >>>>>>>>>>>>>> comments >>>>>>>>>>>>>>>>> would be >>>>>>>>>>>>>>>>>>>>> appreciated! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Best regards, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Qingsheng >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Best Regards, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Qingsheng Ren >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Real-time Computing Team >>>>>>>>>>>>>>>>>> Alibaba Cloud >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best regards, >>>>>>>>>>>>> Roman Boyko >>>>>>>>>>>>> e.: ro.v.bo...@gmail.com >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> >>>>>> Qingsheng Ren >>>>>> >>>>>> Real-time Computing Team >>>>>> Alibaba Cloud >>>>>> >>>>>> Email: renqs...@gmail.com >>>> >>> >>