Thanks Qingsheng and all for your discussion. Very sorry to jump in so late.
Maybe I missed something? My first impression when I saw the cache interface was, why don't we provide an interface similar to guava cache [1], on top of guava cache, caffeine also makes extensions for asynchronous calls.[2] There is also the bulk load in caffeine too. I am also more confused why first from LookupCacheFactory.Builder and then to Factory to create Cache. [1] https://github.com/google/guava [2] https://github.com/ben-manes/caffeine/wiki/Population Best, Jingsong On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com> wrote: > After looking at the new introduced ReloadTime and Becket's comment, > I agree with Becket we should have a pluggable reloading strategy. > We can provide some common implementations, e.g., periodic reloading, and > daily reloading. > But there definitely be some connector- or business-specific reloading > strategies, e.g. > notify by a zookeeper watcher, reload once a new Hive partition is > complete. > > Best, > Jark > > On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> wrote: > > > Hi Qingsheng, > > > > Thanks for updating the FLIP. A few comments / questions below: > > > > 1. Is there a reason that we have both "XXXFactory" and "XXXProvider". > > What is the difference between them? If they are the same, can we just > use > > XXXFactory everywhere? > > > > 2. Regarding the FullCachingLookupProvider, should the reloading policy > > also be pluggable? Periodical reloading could be sometimes be tricky in > > practice. For example, if user uses 24 hours as the cache refresh > interval > > and some nightly batch job delayed, the cache update may still see the > > stale data. > > > > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be > > removed. > > > > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a little > > confusing to me. If Optional<LookupCacheFactory> getCacheFactory() > returns > > a non-empty factory, doesn't that already indicates the framework to > cache > > the missing keys? Also, why is this method returning an Optional<Boolean> > > instead of boolean? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com> > wrote: > > > >> Hi Lincoln and Jark, > >> > >> Thanks for the comments! If the community reaches a consensus that we > use > >> SQL hint instead of table options to decide whether to use sync or async > >> mode, it’s indeed not necessary to introduce the “lookup.async” option. > >> > >> I think it’s a good idea to let the decision of async made on query > >> level, which could make better optimization with more infomation > gathered > >> by planner. Is there any FLIP describing the issue in FLINK-27625? I > >> thought FLIP-234 is only proposing adding SQL hint for retry on missing > >> instead of the entire async mode to be controlled by hint. > >> > >> Best regards, > >> > >> Qingsheng > >> > >> > On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com> > wrote: > >> > > >> > Hi Jark, > >> > > >> > Thanks for your reply! > >> > > >> > Currently 'lookup.async' just lies in HBase connector, I have no idea > >> > whether or when to remove it (we can discuss it in another issue for > the > >> > HBase connector after FLINK-27625 is done), just not add it into a > >> common > >> > option now. > >> > > >> > Best, > >> > Lincoln Lee > >> > > >> > > >> > Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道: > >> > > >> >> Hi Lincoln, > >> >> > >> >> I have taken a look at FLIP-234, and I agree with you that the > >> connectors > >> >> can > >> >> provide both async and sync runtime providers simultaneously instead > >> of one > >> >> of them. > >> >> At that point, "lookup.async" looks redundant. If this option is > >> planned to > >> >> be removed > >> >> in the long term, I think it makes sense not to introduce it in this > >> FLIP. > >> >> > >> >> Best, > >> >> Jark > >> >> > >> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee <lincoln.8...@gmail.com> > >> wrote: > >> >> > >> >>> Hi Qingsheng, > >> >>> > >> >>> Sorry for jumping into the discussion so late. It's a good idea that > >> we > >> >> can > >> >>> have a common table option. I have a minor comments on > 'lookup.async' > >> >> that > >> >>> not make it a common option: > >> >>> > >> >>> The table layer abstracts both sync and async lookup capabilities, > >> >>> connectors implementers can choose one or both, in the case of > >> >> implementing > >> >>> only one capability(status of the most of existing builtin > connectors) > >> >>> 'lookup.async' will not be used. And when a connector has both > >> >>> capabilities, I think this choice is more suitable for making > >> decisions > >> >> at > >> >>> the query level, for example, table planner can choose the physical > >> >>> implementation of async lookup or sync lookup based on its cost > >> model, or > >> >>> users can give query hint based on their own better understanding. > If > >> >>> there is another common table option 'lookup.async', it may confuse > >> the > >> >>> users in the long run. > >> >>> > >> >>> So, I prefer to leave the 'lookup.async' option in private place > (for > >> the > >> >>> current hbase connector) and not turn it into a common option. > >> >>> > >> >>> WDYT? > >> >>> > >> >>> Best, > >> >>> Lincoln Lee > >> >>> > >> >>> > >> >>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道: > >> >>> > >> >>>> Hi Alexander, > >> >>>> > >> >>>> Thanks for the review! We recently updated the FLIP and you can > find > >> >>> those > >> >>>> changes from my latest email. Since some terminologies has changed > so > >> >>> I’ll > >> >>>> use the new concept for replying your comments. > >> >>>> > >> >>>> 1. Builder vs ‘of’ > >> >>>> I’m OK to use builder pattern if we have additional optional > >> parameters > >> >>>> for full caching mode (“rescan” previously). The > schedule-with-delay > >> >> idea > >> >>>> looks reasonable to me, but I think we need to redesign the builder > >> API > >> >>> of > >> >>>> full caching to make it more descriptive for developers. Would you > >> mind > >> >>>> sharing your ideas about the API? For accessing the FLIP workspace > >> you > >> >>> can > >> >>>> just provide your account ID and ping any PMC member including > Jark. > >> >>>> > >> >>>> 2. Common table options > >> >>>> We have some discussions these days and propose to introduce 8 > common > >> >>>> table options about caching. It has been updated on the FLIP. > >> >>>> > >> >>>> 3. Retries > >> >>>> I think we are on the same page :-) > >> >>>> > >> >>>> For your additional concerns: > >> >>>> 1) The table option has been updated. > >> >>>> 2) We got “lookup.cache” back for configuring whether to use > partial > >> or > >> >>>> full caching mode. > >> >>>> > >> >>>> Best regards, > >> >>>> > >> >>>> Qingsheng > >> >>>> > >> >>>> > >> >>>> > >> >>>>> On May 19, 2022, at 17:25, Александр Смирнов < > smirale...@gmail.com> > >> >>>> wrote: > >> >>>>> > >> >>>>> Also I have a few additions: > >> >>>>> 1) maybe rename 'lookup.cache.maximum-size' to > >> >>>>> 'lookup.cache.max-rows'? I think it will be more clear that we > talk > >> >>>>> not about bytes, but about the number of rows. Plus it fits more, > >> >>>>> considering my optimization with filters. > >> >>>>> 2) How will users enable rescanning? Are we going to separate > >> caching > >> >>>>> and rescanning from the options point of view? Like initially we > had > >> >>>>> one option 'lookup.cache' with values LRU / ALL. I think now we > can > >> >>>>> make a boolean option 'lookup.rescan'. RescanInterval can be > >> >>>>> 'lookup.rescan.interval', etc. > >> >>>>> > >> >>>>> Best regards, > >> >>>>> Alexander > >> >>>>> > >> >>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов < > smirale...@gmail.com > >> >>> : > >> >>>>>> > >> >>>>>> Hi Qingsheng and Jark, > >> >>>>>> > >> >>>>>> 1. Builders vs 'of' > >> >>>>>> I understand that builders are used when we have multiple > >> >> parameters. > >> >>>>>> I suggested them because we could add parameters later. To > prevent > >> >>>>>> Builder for ScanRuntimeProvider from looking redundant I can > >> suggest > >> >>>>>> one more config now - "rescanStartTime". > >> >>>>>> It's a time in UTC (LocalTime class) when the first reload of > cache > >> >>>>>> starts. This parameter can be thought of as 'initialDelay' (diff > >> >>>>>> between current time and rescanStartTime) in method > >> >>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be > very > >> >>>>>> useful when the dimension table is updated by some other > scheduled > >> >> job > >> >>>>>> at a certain time. Or when the user simply wants a second scan > >> >> (first > >> >>>>>> cache reload) be delayed. This option can be used even without > >> >>>>>> 'rescanInterval' - in this case 'rescanInterval' will be one day. > >> >>>>>> If you are fine with this option, I would be very glad if you > would > >> >>>>>> give me access to edit FLIP page, so I could add it myself > >> >>>>>> > >> >>>>>> 2. Common table options > >> >>>>>> I also think that FactoryUtil would be overloaded by all cache > >> >>>>>> options. But maybe unify all suggested options, not only for > >> default > >> >>>>>> cache? I.e. class 'LookupOptions', that unifies default cache > >> >> options, > >> >>>>>> rescan options, 'async', 'maxRetries'. WDYT? > >> >>>>>> > >> >>>>>> 3. Retries > >> >>>>>> I'm fine with suggestion close to RetryUtils#tryTimes(times, > call) > >> >>>>>> > >> >>>>>> [1] > >> >>>> > >> >>> > >> >> > >> > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- > >> >>>>>> > >> >>>>>> Best regards, > >> >>>>>> Alexander > >> >>>>>> > >> >>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <renqs...@gmail.com>: > >> >>>>>>> > >> >>>>>>> Hi Jark and Alexander, > >> >>>>>>> > >> >>>>>>> Thanks for your comments! I’m also OK to introduce common table > >> >>>> options. I prefer to introduce a new DefaultLookupCacheOptions > class > >> >> for > >> >>>> holding these option definitions because putting all options into > >> >>>> FactoryUtil would make it a bit ”crowded” and not well categorized. > >> >>>>>>> > >> >>>>>>> FLIP has been updated according to suggestions above: > >> >>>>>>> 1. Use static “of” method for constructing RescanRuntimeProvider > >> >>>> considering both arguments are required. > >> >>>>>>> 2. Introduce new table options matching > DefaultLookupCacheFactory > >> >>>>>>> > >> >>>>>>> Best, > >> >>>>>>> Qingsheng > >> >>>>>>> > >> >>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> > wrote: > >> >>>>>>>> > >> >>>>>>>> Hi Alex, > >> >>>>>>>> > >> >>>>>>>> 1) retry logic > >> >>>>>>>> I think we can extract some common retry logic into utilities, > >> >> e.g. > >> >>>> RetryUtils#tryTimes(times, call). > >> >>>>>>>> This seems independent of this FLIP and can be reused by > >> >> DataStream > >> >>>> users. > >> >>>>>>>> Maybe we can open an issue to discuss this and where to put it. > >> >>>>>>>> > >> >>>>>>>> 2) cache ConfigOptions > >> >>>>>>>> I'm fine with defining cache config options in the framework. > >> >>>>>>>> A candidate place to put is FactoryUtil which also includes > >> >>>> "sink.parallelism", "format" options. > >> >>>>>>>> > >> >>>>>>>> Best, > >> >>>>>>>> Jark > >> >>>>>>>> > >> >>>>>>>> > >> >>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов < > >> >>> smirale...@gmail.com> > >> >>>> wrote: > >> >>>>>>>>> > >> >>>>>>>>> Hi Qingsheng, > >> >>>>>>>>> > >> >>>>>>>>> Thank you for considering my comments. > >> >>>>>>>>> > >> >>>>>>>>>> there might be custom logic before making retry, such as > >> >>>> re-establish the connection > >> >>>>>>>>> > >> >>>>>>>>> Yes, I understand that. I meant that such logic can be placed > in > >> >> a > >> >>>>>>>>> separate function, that can be implemented by connectors. Just > >> >>> moving > >> >>>>>>>>> the retry logic would make connector's LookupFunction more > >> >> concise > >> >>> + > >> >>>>>>>>> avoid duplicate code. However, it's a minor change. The > decision > >> >> is > >> >>>> up > >> >>>>>>>>> to you. > >> >>>>>>>>> > >> >>>>>>>>>> We decide not to provide common DDL options and let > developers > >> >> to > >> >>>> define their own options as we do now per connector. > >> >>>>>>>>> > >> >>>>>>>>> What is the reason for that? One of the main goals of this > FLIP > >> >> was > >> >>>> to > >> >>>>>>>>> unify the configs, wasn't it? I understand that current cache > >> >>> design > >> >>>>>>>>> doesn't depend on ConfigOptions, like was before. But still we > >> >> can > >> >>>> put > >> >>>>>>>>> these options into the framework, so connectors can reuse them > >> >> and > >> >>>>>>>>> avoid code duplication, and, what is more significant, avoid > >> >>> possible > >> >>>>>>>>> different options naming. This moment can be pointed out in > >> >>>>>>>>> documentation for connector developers. > >> >>>>>>>>> > >> >>>>>>>>> Best regards, > >> >>>>>>>>> Alexander > >> >>>>>>>>> > >> >>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com > >: > >> >>>>>>>>>> > >> >>>>>>>>>> Hi Alexander, > >> >>>>>>>>>> > >> >>>>>>>>>> Thanks for the review and glad to see we are on the same > page! > >> I > >> >>>> think you forgot to cc the dev mailing list so I’m also quoting > your > >> >>> reply > >> >>>> under this email. > >> >>>>>>>>>> > >> >>>>>>>>>>> We can add 'maxRetryTimes' option into this class > >> >>>>>>>>>> > >> >>>>>>>>>> In my opinion the retry logic should be implemented in > lookup() > >> >>>> instead of in LookupFunction#eval(). Retrying is only meaningful > >> under > >> >>> some > >> >>>> specific retriable failures, and there might be custom logic before > >> >>> making > >> >>>> retry, such as re-establish the connection > (JdbcRowDataLookupFunction > >> >> is > >> >>> an > >> >>>> example), so it's more handy to leave it to the connector. > >> >>>>>>>>>> > >> >>>>>>>>>>> I don't see DDL options, that were in previous version of > >> FLIP. > >> >>> Do > >> >>>> you have any special plans for them? > >> >>>>>>>>>> > >> >>>>>>>>>> We decide not to provide common DDL options and let > developers > >> >> to > >> >>>> define their own options as we do now per connector. > >> >>>>>>>>>> > >> >>>>>>>>>> The rest of comments sound great and I’ll update the FLIP. > Hope > >> >> we > >> >>>> can finalize our proposal soon! > >> >>>>>>>>>> > >> >>>>>>>>>> Best, > >> >>>>>>>>>> > >> >>>>>>>>>> Qingsheng > >> >>>>>>>>>> > >> >>>>>>>>>> > >> >>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов < > >> >>> smirale...@gmail.com> > >> >>>> wrote: > >> >>>>>>>>>>> > >> >>>>>>>>>>> Hi Qingsheng and devs! > >> >>>>>>>>>>> > >> >>>>>>>>>>> I like the overall design of updated FLIP, however I have > >> >> several > >> >>>>>>>>>>> suggestions and questions. > >> >>>>>>>>>>> > >> >>>>>>>>>>> 1) Introducing LookupFunction as a subclass of TableFunction > >> >> is a > >> >>>> good > >> >>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this class. > >> 'eval' > >> >>>> method > >> >>>>>>>>>>> of new LookupFunction is great for this purpose. The same is > >> >> for > >> >>>>>>>>>>> 'async' case. > >> >>>>>>>>>>> > >> >>>>>>>>>>> 2) There might be other configs in future, such as > >> >>>> 'cacheMissingKey' > >> >>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in > >> >>>> ScanRuntimeProvider. > >> >>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider and > >> >>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one 'build' > >> >>> method > >> >>>>>>>>>>> instead of many 'of' methods in future)? > >> >>>>>>>>>>> > >> >>>>>>>>>>> 3) What are the plans for existing TableFunctionProvider and > >> >>>>>>>>>>> AsyncTableFunctionProvider? I think they should be > deprecated. > >> >>>>>>>>>>> > >> >>>>>>>>>>> 4) Am I right that the current design does not assume usage > of > >> >>>>>>>>>>> user-provided LookupCache in re-scanning? In this case, it > is > >> >> not > >> >>>> very > >> >>>>>>>>>>> clear why do we need methods such as 'invalidate' or > 'putAll' > >> >> in > >> >>>>>>>>>>> LookupCache. > >> >>>>>>>>>>> > >> >>>>>>>>>>> 5) I don't see DDL options, that were in previous version of > >> >>> FLIP. > >> >>>> Do > >> >>>>>>>>>>> you have any special plans for them? > >> >>>>>>>>>>> > >> >>>>>>>>>>> If you don't mind, I would be glad to be able to make small > >> >>>>>>>>>>> adjustments to the FLIP document too. I think it's worth > >> >>> mentioning > >> >>>>>>>>>>> about what exactly optimizations are planning in the future. > >> >>>>>>>>>>> > >> >>>>>>>>>>> Best regards, > >> >>>>>>>>>>> Smirnov Alexander > >> >>>>>>>>>>> > >> >>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren < > renqs...@gmail.com > >> >>> : > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Hi Alexander and devs, > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Thank you very much for the in-depth discussion! As Jark > >> >>>> mentioned we were inspired by Alexander's idea and made a refactor > on > >> >> our > >> >>>> design. FLIP-221 [1] has been updated to reflect our design now and > >> we > >> >>> are > >> >>>> happy to hear more suggestions from you! > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Compared to the previous design: > >> >>>>>>>>>>>> 1. The lookup cache serves at table runtime level and is > >> >>>> integrated as a component of LookupJoinRunner as discussed > >> previously. > >> >>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect the > new > >> >>>> design. > >> >>>>>>>>>>>> 3. We separate the all-caching case individually and > >> >> introduce a > >> >>>> new RescanRuntimeProvider to reuse the ability of scanning. We are > >> >>> planning > >> >>>> to support SourceFunction / InputFormat for now considering the > >> >>> complexity > >> >>>> of FLIP-27 Source API. > >> >>>>>>>>>>>> 4. A new interface LookupFunction is introduced to make the > >> >>>> semantic of lookup more straightforward for developers. > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> For replying to Alexander: > >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is > >> >> deprecated > >> >>>> or not. Am I right that it will be so in the future, but currently > >> it's > >> >>> not? > >> >>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for now. I > >> >>> think > >> >>>> it will be deprecated in the future but we don't have a clear plan > >> for > >> >>> that. > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Thanks again for the discussion on this FLIP and looking > >> >> forward > >> >>>> to cooperating with you after we finalize the design and > interfaces! > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> [1] > >> >>>> > >> >>> > >> >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Qingsheng > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов < > >> >>>> smirale...@gmail.com> wrote: > >> >>>>>>>>>>>>> > >> >>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard! > >> >>>>>>>>>>>>> > >> >>>>>>>>>>>>> Glad to see that we came to a consensus on almost all > >> points! > >> >>>>>>>>>>>>> > >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is > >> >> deprecated > >> >>>> or > >> >>>>>>>>>>>>> not. Am I right that it will be so in the future, but > >> >> currently > >> >>>> it's > >> >>>>>>>>>>>>> not? Actually I also think that for the first version it's > >> OK > >> >>> to > >> >>>> use > >> >>>>>>>>>>>>> InputFormat in ALL cache realization, because supporting > >> >> rescan > >> >>>>>>>>>>>>> ability seems like a very distant prospect. But for this > >> >>>> decision we > >> >>>>>>>>>>>>> need a consensus among all discussion participants. > >> >>>>>>>>>>>>> > >> >>>>>>>>>>>>> In general, I don't have something to argue with your > >> >>>> statements. All > >> >>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it would be > nice > >> >> to > >> >>>> work > >> >>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot of > work > >> >> on > >> >>>> lookup > >> >>>>>>>>>>>>> join caching with realization very close to the one we are > >> >>>> discussing, > >> >>>>>>>>>>>>> and want to share the results of this work. Anyway looking > >> >>>> forward for > >> >>>>>>>>>>>>> the FLIP update! > >> >>>>>>>>>>>>> > >> >>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>> Smirnov Alexander > >> >>>>>>>>>>>>> > >> >>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>: > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> Hi Alex, > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> Thanks for summarizing your points. > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have > discussed > >> >> it > >> >>>> several times > >> >>>>>>>>>>>>>> and we have totally refactored the design. > >> >>>>>>>>>>>>>> I'm glad to say we have reached a consensus on many of > your > >> >>>> points! > >> >>>>>>>>>>>>>> Qingsheng is still working on updating the design docs > and > >> >>>> maybe can be > >> >>>>>>>>>>>>>> available in the next few days. > >> >>>>>>>>>>>>>> I will share some conclusions from our discussions: > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> 1) we have refactored the design towards to "cache in > >> >>>> framework" way. > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> 2) a "LookupCache" interface for users to customize and a > >> >>>> default > >> >>>>>>>>>>>>>> implementation with builder for users to easy-use. > >> >>>>>>>>>>>>>> This can both make it possible to both have flexibility > and > >> >>>> conciseness. > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup > >> >> cache, > >> >>>> esp reducing > >> >>>>>>>>>>>>>> IO. > >> >>>>>>>>>>>>>> Filter pushdown should be the final state and the unified > >> >> way > >> >>>> to both > >> >>>>>>>>>>>>>> support pruning ALL cache and LRU cache, > >> >>>>>>>>>>>>>> so I think we should make effort in this direction. If we > >> >> need > >> >>>> to support > >> >>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use > >> >>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide to > >> >>> implement > >> >>>> the cache > >> >>>>>>>>>>>>>> in the framework, we have the chance to support > >> >>>>>>>>>>>>>> filter on cache anytime. This is an optimization and it > >> >>> doesn't > >> >>>> affect the > >> >>>>>>>>>>>>>> public API. I think we can create a JIRA issue to > >> >>>>>>>>>>>>>> discuss it when the FLIP is accepted. > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to your > >> >> proposal. > >> >>>>>>>>>>>>>> In the first version, we will only support InputFormat, > >> >>>> SourceFunction for > >> >>>>>>>>>>>>>> cache all (invoke InputFormat in join operator). > >> >>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source > operator > >> >>>> instead of > >> >>>>>>>>>>>>>> calling it embedded in the join operator. > >> >>>>>>>>>>>>>> However, this needs another FLIP to support the re-scan > >> >>> ability > >> >>>> for FLIP-27 > >> >>>>>>>>>>>>>> Source, and this can be a large work. > >> >>>>>>>>>>>>>> In order to not block this issue, we can put the effort > of > >> >>>> FLIP-27 source > >> >>>>>>>>>>>>>> integration into future work and integrate > >> >>>>>>>>>>>>>> InputFormat&SourceFunction for now. > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> I think it's fine to use InputFormat&SourceFunction, as > >> they > >> >>>> are not > >> >>>>>>>>>>>>>> deprecated, otherwise, we have to introduce another > >> function > >> >>>>>>>>>>>>>> similar to them which is meaningless. We need to plan > >> >> FLIP-27 > >> >>>> source > >> >>>>>>>>>>>>>> integration ASAP before InputFormat & SourceFunction are > >> >>>> deprecated. > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> Best, > >> >>>>>>>>>>>>>> Jark > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов < > >> >>>> smirale...@gmail.com> > >> >>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> Hi Martijn! > >> >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> Got it. Therefore, the realization with InputFormat is > not > >> >>>> considered. > >> >>>>>>>>>>>>>>> Thanks for clearing that up! > >> >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>> Smirnov Alexander > >> >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser < > >> >>>> mart...@ververica.com>: > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> Hi, > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> With regards to: > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> But if there are plans to refactor all connectors to > >> >>> FLIP-27 > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old > >> >>>> interfaces will be > >> >>>>>>>>>>>>>>>> deprecated and connectors will either be refactored to > >> use > >> >>>> the new ones > >> >>>>>>>>>>>>>>> or > >> >>>>>>>>>>>>>>>> dropped. > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> The caching should work for connectors that are using > >> >>> FLIP-27 > >> >>>> interfaces, > >> >>>>>>>>>>>>>>>> we should not introduce new features for old > interfaces. > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> Martijn > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов < > >> >>>> smirale...@gmail.com> > >> >>>>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> Hi Jark! > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> Sorry for the late response. I would like to make some > >> >>>> comments and > >> >>>>>>>>>>>>>>>>> clarify my points. > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think we can > >> >>> achieve > >> >>>> both > >> >>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in > >> >>>> flink-table-common, > >> >>>>>>>>>>>>>>>>> but have implementations of it in flink-table-runtime. > >> >>>> Therefore if a > >> >>>>>>>>>>>>>>>>> connector developer wants to use existing cache > >> >> strategies > >> >>>> and their > >> >>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig to the > >> >>>> planner, but if > >> >>>>>>>>>>>>>>>>> he wants to have its own cache implementation in his > >> >>>> TableFunction, it > >> >>>>>>>>>>>>>>>>> will be possible for him to use the existing interface > >> >> for > >> >>>> this > >> >>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in the > >> >>>> documentation). In > >> >>>>>>>>>>>>>>>>> this way all configs and metrics will be unified. > WDYT? > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we > will > >> >>>> have 90% of > >> >>>>>>>>>>>>>>>>> lookup requests that can never be cached > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters optimization in > case > >> >> of > >> >>>> LRU cache. > >> >>>>>>>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>. > Here > >> >> we > >> >>>> always > >> >>>>>>>>>>>>>>>>> store the response of the dimension table in cache, > even > >> >>>> after > >> >>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no rows > after > >> >>>> applying > >> >>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of > >> >>> TableFunction, > >> >>>> we store > >> >>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the cache > line > >> >>> will > >> >>>> be > >> >>>>>>>>>>>>>>>>> filled, but will require much less memory (in bytes). > >> >> I.e. > >> >>>> we don't > >> >>>>>>>>>>>>>>>>> completely filter keys, by which result was pruned, > but > >> >>>> significantly > >> >>>>>>>>>>>>>>>>> reduce required memory to store this result. If the > user > >> >>>> knows about > >> >>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' option > >> >> before > >> >>>> the start > >> >>>>>>>>>>>>>>>>> of the job. But actually I came up with the idea that > we > >> >>> can > >> >>>> do this > >> >>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and > 'weigher' > >> >>>> methods of > >> >>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the > collection > >> >> of > >> >>>> rows > >> >>>>>>>>>>>>>>>>> (value of cache). Therefore cache can automatically > fit > >> >>> much > >> >>>> more > >> >>>>>>>>>>>>>>>>> records than before. > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do filters > and > >> >>>> projects > >> >>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > >> >>>> SupportsProjectionPushDown. > >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, > >> >> don't > >> >>>> mean it's > >> >>>>>>>>>>>>>>> hard > >> >>>>>>>>>>>>>>>>> to implement. > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> It's debatable how difficult it will be to implement > >> >> filter > >> >>>> pushdown. > >> >>>>>>>>>>>>>>>>> But I think the fact that currently there is no > database > >> >>>> connector > >> >>>>>>>>>>>>>>>>> with filter pushdown at least means that this feature > >> >> won't > >> >>>> be > >> >>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we talk > about > >> >>>> other > >> >>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases might > >> not > >> >>>> support all > >> >>>>>>>>>>>>>>>>> Flink filters (or not support filters at all). I think > >> >>> users > >> >>>> are > >> >>>>>>>>>>>>>>>>> interested in supporting cache filters optimization > >> >>>> independently of > >> >>>>>>>>>>>>>>>>> supporting other features and solving more complex > >> >> problems > >> >>>> (or > >> >>>>>>>>>>>>>>>>> unsolvable at all). > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually in our > >> >>>> internal version > >> >>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and > >> reloading > >> >>>> data from > >> >>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a way to > >> >> unify > >> >>>> the logic > >> >>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, > >> SourceFunction, > >> >>>> Source,...) > >> >>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I > >> >> settled > >> >>>> on using > >> >>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning in all > >> >> lookup > >> >>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans to > >> >>> deprecate > >> >>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of > >> >>>> FLIP-27 source > >> >>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this source > was > >> >>>> designed to > >> >>>>>>>>>>>>>>>>> work in distributed environment (SplitEnumerator on > >> >>>> JobManager and > >> >>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator > >> >> (lookup > >> >>>> join > >> >>>>>>>>>>>>>>>>> operator in our case). There is even no direct way to > >> >> pass > >> >>>> splits from > >> >>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works > >> through > >> >>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires > >> >>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send > >> >> AddSplitEvents). > >> >>>> Usage of > >> >>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer and > >> >>>> easier. But if > >> >>>>>>>>>>>>>>>>> there are plans to refactor all connectors to > FLIP-27, I > >> >>>> have the > >> >>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from lookup join > >> ALL > >> >>>> cache in > >> >>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of batch > >> >>> source? > >> >>>> The point > >> >>>>>>>>>>>>>>>>> is that the only difference between lookup join ALL > >> cache > >> >>>> and simple > >> >>>>>>>>>>>>>>>>> join with batch source is that in the first case > >> scanning > >> >>> is > >> >>>> performed > >> >>>>>>>>>>>>>>>>> multiple times, in between which state (cache) is > >> cleared > >> >>>> (correct me > >> >>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the functionality > of > >> >>>> simple join > >> >>>>>>>>>>>>>>>>> to support state reloading + extend the functionality > of > >> >>>> scanning > >> >>>>>>>>>>>>>>>>> batch source multiple times (this one should be easy > >> with > >> >>>> new FLIP-27 > >> >>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading - we will > >> >> need > >> >>>> to change > >> >>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits again > after > >> >>>> some TTL). > >> >>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a long-term goal > >> >> and > >> >>>> will make > >> >>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you said. > Maybe > >> >> we > >> >>>> can limit > >> >>>>>>>>>>>>>>>>> ourselves to a simpler solution now (InputFormats). > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> So to sum up, my points is like this: > >> >>>>>>>>>>>>>>>>> 1) There is a way to make both concise and flexible > >> >>>> interfaces for > >> >>>>>>>>>>>>>>>>> caching in lookup join. > >> >>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both in LRU > >> >> and > >> >>>> ALL caches. > >> >>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be > supported > >> >> in > >> >>>> Flink > >> >>>>>>>>>>>>>>>>> connectors, some of the connectors might not have the > >> >>>> opportunity to > >> >>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently filter > >> >>>> pushdown works > >> >>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache filters + > >> >>>> projections > >> >>>>>>>>>>>>>>>>> optimization should be independent from other > features. > >> >>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic that > >> involves > >> >>>> multiple > >> >>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing from > >> >>>> InputFormat in favor > >> >>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization > really > >> >>>> complex and > >> >>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can extend the > >> >>>> functionality of > >> >>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in case of > >> >>> lookup > >> >>>> join ALL > >> >>>>>>>>>>>>>>>>> cache? > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>>> Smirnov Alexander > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> [1] > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >> >>>> > >> >>> > >> >> > >> > https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com > >: > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> It's great to see the active discussion! I want to > >> share > >> >>> my > >> >>>> ideas: > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. connectors > base > >> >>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways > should > >> >>>> work (e.g., > >> >>>>>>>>>>>>>>> cache > >> >>>>>>>>>>>>>>>>>> pruning, compatibility). > >> >>>>>>>>>>>>>>>>>> The framework way can provide more concise > interfaces. > >> >>>>>>>>>>>>>>>>>> The connector base way can define more flexible cache > >> >>>>>>>>>>>>>>>>>> strategies/implementations. > >> >>>>>>>>>>>>>>>>>> We are still investigating a way to see if we can > have > >> >>> both > >> >>>>>>>>>>>>>>> advantages. > >> >>>>>>>>>>>>>>>>>> We should reach a consensus that the way should be a > >> >> final > >> >>>> state, > >> >>>>>>>>>>>>>>> and we > >> >>>>>>>>>>>>>>>>>> are on the path to it. > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> 2) filters and projections pushdown: > >> >>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown into cache > >> >> can > >> >>>> benefit a > >> >>>>>>>>>>>>>>> lot > >> >>>>>>>>>>>>>>>>> for > >> >>>>>>>>>>>>>>>>>> ALL cache. > >> >>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. Connectors > use > >> >>>> cache to > >> >>>>>>>>>>>>>>> reduce > >> >>>>>>>>>>>>>>>>> IO > >> >>>>>>>>>>>>>>>>>> requests to databases for better throughput. > >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we > will > >> >>>> have 90% of > >> >>>>>>>>>>>>>>>>> lookup > >> >>>>>>>>>>>>>>>>>> requests that can never be cached > >> >>>>>>>>>>>>>>>>>> and hit directly to the databases. That means the > cache > >> >> is > >> >>>>>>>>>>>>>>> meaningless in > >> >>>>>>>>>>>>>>>>>> this case. > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do > >> filters > >> >>>> and projects > >> >>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > >> >>>>>>>>>>>>>>> SupportsProjectionPushDown. > >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, > >> >> don't > >> >>>> mean it's > >> >>>>>>>>>>>>>>> hard > >> >>>>>>>>>>>>>>>>> to > >> >>>>>>>>>>>>>>>>>> implement. > >> >>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces to > reduce > >> >> IO > >> >>>> and the > >> >>>>>>>>>>>>>>> cache > >> >>>>>>>>>>>>>>>>>> size. > >> >>>>>>>>>>>>>>>>>> That should be a final state that the scan source and > >> >>>> lookup source > >> >>>>>>>>>>>>>>> share > >> >>>>>>>>>>>>>>>>>> the exact pushdown implementation. > >> >>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the pushdown > logic > >> >> in > >> >>>> caches, > >> >>>>>>>>>>>>>>> which > >> >>>>>>>>>>>>>>>>>> will complex the lookup join design. > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> 3) ALL cache abstraction > >> >>>>>>>>>>>>>>>>>> All cache might be the most challenging part of this > >> >> FLIP. > >> >>>> We have > >> >>>>>>>>>>>>>>> never > >> >>>>>>>>>>>>>>>>>> provided a reload-lookup public interface. > >> >>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the "eval" > method > >> >> of > >> >>>>>>>>>>>>>>> TableFunction. > >> >>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive). > >> >>>>>>>>>>>>>>>>>> Ideally, connector implementation should share the > >> logic > >> >>> of > >> >>>> reload > >> >>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with > >> >>>> InputFormat/SourceFunction/FLIP-27 > >> >>>>>>>>>>>>>>>>> Source. > >> >>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated, > and > >> >>> the > >> >>>> FLIP-27 > >> >>>>>>>>>>>>>>>>> source > >> >>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator. > >> >>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in > LookupJoin, > >> >>> this > >> >>>> may make > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>> scope of this FLIP much larger. > >> >>>>>>>>>>>>>>>>>> We are still investigating how to abstract the ALL > >> cache > >> >>>> logic and > >> >>>>>>>>>>>>>>> reuse > >> >>>>>>>>>>>>>>>>>> the existing source interfaces. > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> Best, > >> >>>>>>>>>>>>>>>>>> Jark > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko < > >> >>>> ro.v.bo...@gmail.com> > >> >>>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> It's a much more complicated activity and lies out > of > >> >> the > >> >>>> scope of > >> >>>>>>>>>>>>>>> this > >> >>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be done > for > >> >>> all > >> >>>>>>>>>>>>>>>>> ScanTableSource > >> >>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones). > >> >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser < > >> >>>>>>>>>>>>>>> martijnvis...@apache.org> > >> >>>>>>>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> Hi everyone, > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander correctly > >> >>> mentioned > >> >>>> that > >> >>>>>>>>>>>>>>> filter > >> >>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for > >> >> jdbc/hive/hbase." > >> >>>> -> Would > >> >>>>>>>>>>>>>>> an > >> >>>>>>>>>>>>>>>>>>>> alternative solution be to actually implement these > >> >>> filter > >> >>>>>>>>>>>>>>> pushdowns? > >> >>>>>>>>>>>>>>>>> I > >> >>>>>>>>>>>>>>>>>>>> can > >> >>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to doing > >> >> that, > >> >>>> outside > >> >>>>>>>>>>>>>>> of > >> >>>>>>>>>>>>>>>>> lookup > >> >>>>>>>>>>>>>>>>>>>> caching and metrics. > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> Martijn Visser > >> >>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82 > >> >>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < > >> >>>> ro.v.bo...@gmail.com> > >> >>>>>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> Hi everyone! > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement! > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation would > be > >> >> a > >> >>>> nice > >> >>>>>>>>>>>>>>>>> opportunity > >> >>>>>>>>>>>>>>>>>>>> for > >> >>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS > OF > >> >>>> proc_time" > >> >>>>>>>>>>>>>>>>> semantics > >> >>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be > implemented. > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say > that: > >> >>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut > off > >> >>> the > >> >>>> cache > >> >>>>>>>>>>>>>>> size > >> >>>>>>>>>>>>>>>>> by > >> >>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most > >> handy > >> >>>> way to do > >> >>>>>>>>>>>>>>> it > >> >>>>>>>>>>>>>>>>> is > >> >>>>>>>>>>>>>>>>>>>> apply > >> >>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder > to > >> >>>> pass it > >> >>>>>>>>>>>>>>>>> through the > >> >>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander > >> >>> correctly > >> >>>>>>>>>>>>>>> mentioned > >> >>>>>>>>>>>>>>>>> that > >> >>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented for > >> >>>> jdbc/hive/hbase. > >> >>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching > >> >> parameters > >> >>>> for > >> >>>>>>>>>>>>>>> different > >> >>>>>>>>>>>>>>>>>>>> tables > >> >>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to set it > >> >> through > >> >>>> DDL > >> >>>>>>>>>>>>>>> rather > >> >>>>>>>>>>>>>>>>> than > >> >>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other options for > >> all > >> >>>> lookup > >> >>>>>>>>>>>>>>> tables. > >> >>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework really > >> >>>> deprives us of > >> >>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to implement > >> their > >> >>> own > >> >>>>>>>>>>>>>>> cache). > >> >>>>>>>>>>>>>>>>> But > >> >>>>>>>>>>>>>>>>>>>> most > >> >>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating more > >> >> different > >> >>>> cache > >> >>>>>>>>>>>>>>>>> strategies > >> >>>>>>>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>>>>>> a wider set of configurations. > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> All these points are much closer to the schema > >> >> proposed > >> >>>> by > >> >>>>>>>>>>>>>>>>> Alexander. > >> >>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right > and > >> >>> all > >> >>>> these > >> >>>>>>>>>>>>>>>>>>>> facilities > >> >>>>>>>>>>>>>>>>>>>>> might be simply implemented in your architecture? > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>>>>>>> Roman Boyko > >> >>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser < > >> >>>>>>>>>>>>>>>>> martijnvis...@apache.org> > >> >>>>>>>>>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >> >>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to > >> >>>> express that > >> >>>>>>>>>>>>>>> I > >> >>>>>>>>>>>>>>>>> really > >> >>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic > >> >> and I > >> >>>> hope > >> >>>>>>>>>>>>>>> that > >> >>>>>>>>>>>>>>>>>>>> others > >> >>>>>>>>>>>>>>>>>>>>>> will join the conversation. > >> >>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>> Martijn > >> >>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов < > >> >>>>>>>>>>>>>>>>> smirale...@gmail.com> > >> >>>>>>>>>>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I > have > >> >>>> questions > >> >>>>>>>>>>>>>>>>> about > >> >>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get > >> >>>> something?). > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR > >> >>>> SYSTEM_TIME > >> >>>>>>>>>>>>>>> AS OF > >> >>>>>>>>>>>>>>>>>>>>>> proc_time” > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME > AS > >> >> OF > >> >>>>>>>>>>>>>>> proc_time" > >> >>>>>>>>>>>>>>>>> is > >> >>>>>>>>>>>>>>>>>>>> not > >> >>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you said, > >> >>> users > >> >>>> go > >> >>>>>>>>>>>>>>> on it > >> >>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance (no > one > >> >>>> proposed > >> >>>>>>>>>>>>>>> to > >> >>>>>>>>>>>>>>>>> enable > >> >>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you > mean > >> >>>> other > >> >>>>>>>>>>>>>>>>> developers > >> >>>>>>>>>>>>>>>>>>>> of > >> >>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers explicitly > >> >>> specify > >> >>>>>>>>>>>>>>> whether > >> >>>>>>>>>>>>>>>>> their > >> >>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the list > of > >> >>>> supported > >> >>>>>>>>>>>>>>>>>>>> options), > >> >>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't want to. > >> So > >> >>>> what > >> >>>>>>>>>>>>>>>>> exactly is > >> >>>>>>>>>>>>>>>>>>>>>>> the difference between implementing caching in > >> >>> modules > >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common > from > >> >>> the > >> >>>>>>>>>>>>>>>>> considered > >> >>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on > >> >>>> breaking/non-breaking > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"? > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table options > in > >> >>> DDL > >> >>>> to > >> >>>>>>>>>>>>>>>>> control > >> >>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has never > >> happened > >> >>>>>>>>>>>>>>> previously > >> >>>>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>>>>>> should > >> >>>>>>>>>>>>>>>>>>>>>>> be cautious > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of semantics > of > >> >> DDL > >> >>>>>>>>>>>>>>> options > >> >>>>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about > >> >>>> limiting > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>> scope > >> >>>>>>>>>>>>>>>>>>>> of > >> >>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user business > >> >> logic > >> >>>> rather > >> >>>>>>>>>>>>>>> than > >> >>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic in the > >> >>>> framework? I > >> >>>>>>>>>>>>>>>>> mean > >> >>>>>>>>>>>>>>>>>>>> that > >> >>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an option > with > >> >>>> lookup > >> >>>>>>>>>>>>>>> cache > >> >>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would be the wrong > >> >>>> decision, > >> >>>>>>>>>>>>>>>>> because it > >> >>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business logic (not > >> >> just > >> >>>>>>>>>>>>>>> performance > >> >>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several functions > of > >> >> ONE > >> >>>> table > >> >>>>>>>>>>>>>>>>> (there > >> >>>>>>>>>>>>>>>>>>>> can > >> >>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different caches). Does > it > >> >>>> really > >> >>>>>>>>>>>>>>>>> matter for > >> >>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is > >> >>> located, > >> >>>>>>>>>>>>>>> which is > >> >>>>>>>>>>>>>>>>>>>>>>> affected by the applied option? > >> >>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option > 'sink.parallelism', > >> >>>> which in > >> >>>>>>>>>>>>>>>>> some way > >> >>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I > >> >> don't > >> >>>> see any > >> >>>>>>>>>>>>>>>>> problem > >> >>>>>>>>>>>>>>>>>>>>>>> here. > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching > >> >>>> scenario > >> >>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>> design > >> >>>>>>>>>>>>>>>>>>>>>>> would become more complex > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but > >> >>>> actually > >> >>>>>>>>>>>>>>> in our > >> >>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem quite > >> >> easily > >> >>> - > >> >>>> we > >> >>>>>>>>>>>>>>> reused > >> >>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new > >> >>> API). > >> >>>> The > >> >>>>>>>>>>>>>>>>> point is > >> >>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use > >> >> InputFormat > >> >>>> for > >> >>>>>>>>>>>>>>>>> scanning > >> >>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - > it > >> >>> uses > >> >>>>>>>>>>>>>>> class > >> >>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper > >> >>> around > >> >>>>>>>>>>>>>>>>> InputFormat. > >> >>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the ability to > >> >>> reload > >> >>>>>>>>>>>>>>> cache > >> >>>>>>>>>>>>>>>>> data > >> >>>>>>>>>>>>>>>>>>>> in > >> >>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on number of > >> >>>>>>>>>>>>>>> InputSplits, > >> >>>>>>>>>>>>>>>>> but > >> >>>>>>>>>>>>>>>>>>>> has > >> >>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time > >> >>>> significantly > >> >>>>>>>>>>>>>>>>> reduces > >> >>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I > know > >> >>> that > >> >>>>>>>>>>>>>>> usually > >> >>>>>>>>>>>>>>>>> we > >> >>>>>>>>>>>>>>>>>>>> try > >> >>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but > >> >>> maybe > >> >>>> this > >> >>>>>>>>>>>>>>> one > >> >>>>>>>>>>>>>>>>> can > >> >>>>>>>>>>>>>>>>>>>> be > >> >>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal > >> >>>> solution, > >> >>>>>>>>>>>>>>> maybe > >> >>>>>>>>>>>>>>>>>>>> there > >> >>>>>>>>>>>>>>>>>>>>>>> are better ones. > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might > >> >> introduce > >> >>>>>>>>>>>>>>>>> compatibility > >> >>>>>>>>>>>>>>>>>>>>>> issues > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the developer > of > >> >> the > >> >>>>>>>>>>>>>>> connector > >> >>>>>>>>>>>>>>>>>>>> won't > >> >>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use new > cache > >> >>>> options > >> >>>>>>>>>>>>>>>>>>>> incorrectly > >> >>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2 > >> >>>> different > >> >>>>>>>>>>>>>>> code > >> >>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will need > to > >> >> do > >> >>>> is to > >> >>>>>>>>>>>>>>>>> redirect > >> >>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig > >> (+ > >> >>>> maybe > >> >>>>>>>>>>>>>>> add an > >> >>>>>>>>>>>>>>>>>>>> alias > >> >>>>>>>>>>>>>>>>>>>>>>> for options, if there was different naming), > >> >>> everything > >> >>>>>>>>>>>>>>> will be > >> >>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer won't do > >> >>>>>>>>>>>>>>> refactoring at > >> >>>>>>>>>>>>>>>>> all, > >> >>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector > because > >> >> of > >> >>>>>>>>>>>>>>> backward > >> >>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use > >> his > >> >>> own > >> >>>>>>>>>>>>>>> cache > >> >>>>>>>>>>>>>>>>> logic, > >> >>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs > >> into > >> >>> the > >> >>>>>>>>>>>>>>>>> framework, > >> >>>>>>>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with already > >> >>>> existing > >> >>>>>>>>>>>>>>>>> configs > >> >>>>>>>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare > >> >> case). > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed all > the > >> >> way > >> >>>> down > >> >>>>>>>>>>>>>>> to > >> >>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>> table > >> >>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan source > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that > the > >> >>> ONLY > >> >>>>>>>>>>>>>>> connector > >> >>>>>>>>>>>>>>>>>>>> that > >> >>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is > FileSystemTableSource > >> >>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it currently). > >> Also > >> >>>> for some > >> >>>>>>>>>>>>>>>>>>>> databases > >> >>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex > >> >>> filters > >> >>>>>>>>>>>>>>> that we > >> >>>>>>>>>>>>>>>>> have > >> >>>>>>>>>>>>>>>>>>>>>>> in Flink. > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the cache > >> >> seems > >> >>>> not > >> >>>>>>>>>>>>>>>>> quite > >> >>>>>>>>>>>>>>>>>>>>> useful > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount > of > >> >>> data > >> >>>>>>>>>>>>>>> from the > >> >>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose > in > >> >>>> dimension > >> >>>>>>>>>>>>>>>>> table > >> >>>>>>>>>>>>>>>>>>>>>>> 'users' > >> >>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40, > >> and > >> >>>> input > >> >>>>>>>>>>>>>>> stream > >> >>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age > of > >> >>>> users. If > >> >>>>>>>>>>>>>>> we > >> >>>>>>>>>>>>>>>>> have > >> >>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30', > >> >>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. This > means > >> >>> the > >> >>>> user > >> >>>>>>>>>>>>>>> can > >> >>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 > >> times. > >> >>> It > >> >>>> will > >> >>>>>>>>>>>>>>>>> gain a > >> >>>>>>>>>>>>>>>>>>>>>>> huge > >> >>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization > >> >> starts > >> >>>> to > >> >>>>>>>>>>>>>>> really > >> >>>>>>>>>>>>>>>>>>>> shine > >> >>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and > >> >>>> projections > >> >>>>>>>>>>>>>>>>> can't > >> >>>>>>>>>>>>>>>>>>>> fit > >> >>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up > >> >>>> additional > >> >>>>>>>>>>>>>>>>>>>> possibilities > >> >>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite > >> >>>> useful'. > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices regarding > >> >> this > >> >>>> topic! > >> >>>>>>>>>>>>>>>>> Because > >> >>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial points, > and I > >> >>>> think > >> >>>>>>>>>>>>>>> with > >> >>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>> help > >> >>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to come to a > >> >>>> consensus. > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren < > >> >>>>>>>>>>>>>>> renqs...@gmail.com > >> >>>>>>>>>>>>>>>>>> : > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid, > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late > >> >>>> response! > >> >>>>>>>>>>>>>>> We > >> >>>>>>>>>>>>>>>>> had > >> >>>>>>>>>>>>>>>>>>>> an > >> >>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark and > Leonard > >> >>> and > >> >>>> I’d > >> >>>>>>>>>>>>>>> like > >> >>>>>>>>>>>>>>>>> to > >> >>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the > >> >>> cache > >> >>>>>>>>>>>>>>> logic in > >> >>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>> table > >> >>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the > user-provided > >> >>>> table > >> >>>>>>>>>>>>>>>>> function, > >> >>>>>>>>>>>>>>>>>>>> we > >> >>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending > >> >>>> TableFunction > >> >>>>>>>>>>>>>>> with > >> >>>>>>>>>>>>>>>>> these > >> >>>>>>>>>>>>>>>>>>>>>>> concerns: > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR > >> >>>>>>>>>>>>>>> SYSTEM_TIME > >> >>>>>>>>>>>>>>>>> AS OF > >> >>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect > the > >> >>>> content > >> >>>>>>>>>>>>>>> of the > >> >>>>>>>>>>>>>>>>>>>> lookup > >> >>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users choose > >> to > >> >>>> enable > >> >>>>>>>>>>>>>>>>> caching > >> >>>>>>>>>>>>>>>>>>>> on > >> >>>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this > >> >>>> breakage is > >> >>>>>>>>>>>>>>>>>>>> acceptable > >> >>>>>>>>>>>>>>>>>>>>>> in > >> >>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not > to > >> >>>> provide > >> >>>>>>>>>>>>>>>>> caching on > >> >>>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>> table runtime level. > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the > >> >>>> framework > >> >>>>>>>>>>>>>>>>> (whether > >> >>>>>>>>>>>>>>>>>>>> in a > >> >>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we > have > >> >> to > >> >>>>>>>>>>>>>>> confront a > >> >>>>>>>>>>>>>>>>>>>>>> situation > >> >>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to control the > >> >>>> behavior of > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>> framework, > >> >>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and should > be > >> >>>> cautious. > >> >>>>>>>>>>>>>>>>> Under > >> >>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the framework > >> should > >> >>>> only be > >> >>>>>>>>>>>>>>>>>>>> specified > >> >>>>>>>>>>>>>>>>>>>>> by > >> >>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard > >> to > >> >>>> apply > >> >>>>>>>>>>>>>>> these > >> >>>>>>>>>>>>>>>>>>>> general > >> >>>>>>>>>>>>>>>>>>>>>>> configs to a specific table. > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads > and > >> >>>> refresh > >> >>>>>>>>>>>>>>> all > >> >>>>>>>>>>>>>>>>>>>> records > >> >>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve high > >> lookup > >> >>>>>>>>>>>>>>> performance > >> >>>>>>>>>>>>>>>>>>>> (like > >> >>>>>>>>>>>>>>>>>>>>>> Hive > >> >>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also widely used > >> by > >> >>> our > >> >>>>>>>>>>>>>>> internal > >> >>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the > user’s > >> >>>>>>>>>>>>>>> TableFunction > >> >>>>>>>>>>>>>>>>>>>> works > >> >>>>>>>>>>>>>>>>>>>>>> fine > >> >>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to > introduce a > >> >>> new > >> >>>>>>>>>>>>>>>>> interface for > >> >>>>>>>>>>>>>>>>>>>>> this > >> >>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would become > >> >> more > >> >>>>>>>>>>>>>>> complex. > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might > >> >>>> introduce > >> >>>>>>>>>>>>>>>>>>>> compatibility > >> >>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like there > might > >> >>>> exist two > >> >>>>>>>>>>>>>>>>> caches > >> >>>>>>>>>>>>>>>>>>>>> with > >> >>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user > >> >> incorrectly > >> >>>>>>>>>>>>>>> configures > >> >>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>> table > >> >>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another implemented by > >> >> the > >> >>>> lookup > >> >>>>>>>>>>>>>>>>> source). > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by > Alexander, I > >> >>>> think > >> >>>>>>>>>>>>>>>>> filters > >> >>>>>>>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way down to > >> >> the > >> >>>> table > >> >>>>>>>>>>>>>>>>> function, > >> >>>>>>>>>>>>>>>>>>>>> like > >> >>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the > >> >> runner > >> >>>> with > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>> cache. > >> >>>>>>>>>>>>>>>>>>>>> The > >> >>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O > >> >> and > >> >>>>>>>>>>>>>>> pressure > >> >>>>>>>>>>>>>>>>> on the > >> >>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these > >> >>> optimizations > >> >>>> to > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>> cache > >> >>>>>>>>>>>>>>>>>>>>> seems > >> >>>>>>>>>>>>>>>>>>>>>>> not quite useful. > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect > our > >> >>>> ideas. > >> >>>>>>>>>>>>>>> We > >> >>>>>>>>>>>>>>>>>>>> prefer to > >> >>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of > >> >>>> TableFunction, > >> >>>>>>>>>>>>>>> and we > >> >>>>>>>>>>>>>>>>>>>> could > >> >>>>>>>>>>>>>>>>>>>>>>> provide some helper classes > (CachingTableFunction, > >> >>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction, > >> >>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and > >> >> regulate > >> >>>>>>>>>>>>>>> metrics > >> >>>>>>>>>>>>>>>>> of the > >> >>>>>>>>>>>>>>>>>>>>>> cache. > >> >>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference. > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas! > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> [1] > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >> >>>> > >> >>> > >> >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >> >>>>>>>>>>>>>>>>>>>>>>>> [2] > >> >>> https://github.com/PatrickRen/flink/tree/FLIP-221 > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр > Смирнов > >> >> < > >> >>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > >> >>>>>>>>>>>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid! > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message. > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier solution > as > >> >> the > >> >>>>>>>>>>>>>>> first > >> >>>>>>>>>>>>>>>>> step: > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually > exclusive > >> >>>>>>>>>>>>>>> (originally > >> >>>>>>>>>>>>>>>>>>>>> proposed > >> >>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually > >> they > >> >>>> follow > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>> same > >> >>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are > different. > >> >> If > >> >>> we > >> >>>>>>>>>>>>>>> will > >> >>>>>>>>>>>>>>>>> go one > >> >>>>>>>>>>>>>>>>>>>>> way, > >> >>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future will mean > >> >>>> deleting > >> >>>>>>>>>>>>>>>>> existing > >> >>>>>>>>>>>>>>>>>>>> code > >> >>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for > connectors. > >> >> So > >> >>> I > >> >>>>>>>>>>>>>>> think we > >> >>>>>>>>>>>>>>>>>>>> should > >> >>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community about > that > >> >> and > >> >>>> then > >> >>>>>>>>>>>>>>> work > >> >>>>>>>>>>>>>>>>>>>>> together > >> >>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks > for > >> >>>> different > >> >>>>>>>>>>>>>>>>> parts > >> >>>>>>>>>>>>>>>>>>>> of > >> >>>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification / > >> >>>> introducing > >> >>>>>>>>>>>>>>>>> proposed > >> >>>>>>>>>>>>>>>>>>>> set > >> >>>>>>>>>>>>>>>>>>>>> of > >> >>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng? > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the requests > >> >> after > >> >>>>>>>>>>>>>>> filter > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of > the > >> >>>> lookup > >> >>>>>>>>>>>>>>>>> table, we > >> >>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that > we > >> >>> can > >> >>>>>>>>>>>>>>> filter > >> >>>>>>>>>>>>>>>>>>>>> responses, > >> >>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter > >> >>>> pushdown. So > >> >>>>>>>>>>>>>>> if > >> >>>>>>>>>>>>>>>>>>>>> filtering > >> >>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be much > less > >> >>> rows > >> >>>> in > >> >>>>>>>>>>>>>>>>> cache. > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture > is > >> >> not > >> >>>>>>>>>>>>>>> shared. > >> >>>>>>>>>>>>>>>>> I > >> >>>>>>>>>>>>>>>>>>>> don't > >> >>>>>>>>>>>>>>>>>>>>>>> know the > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of > >> >>>>>>>>>>>>>>> conversations > >> >>>>>>>>>>>>>>>>> :) > >> >>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I > >> >>> made a > >> >>>>>>>>>>>>>>> Jira > >> >>>>>>>>>>>>>>>>> issue, > >> >>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in more > >> >>> details > >> >>>> - > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >> https://issues.apache.org/jira/browse/FLINK-27411. > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback! > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>> Best, > >> >>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > >> >>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise < > >> >>>>>>>>>>>>>>> ar...@apache.org>: > >> >>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, > >> >>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency > was > >> >> not > >> >>>>>>>>>>>>>>>>> satisfying > >> >>>>>>>>>>>>>>>>>>>> for > >> >>>>>>>>>>>>>>>>>>>>>> me. > >> >>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could > also > >> >>> live > >> >>>>>>>>>>>>>>> with > >> >>>>>>>>>>>>>>>>> an > >> >>>>>>>>>>>>>>>>>>>>> easier > >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making > >> >>>> caching > >> >>>>>>>>>>>>>>> an > >> >>>>>>>>>>>>>>>>>>>>>>> implementation > >> >>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a > >> >> caching > >> >>>>>>>>>>>>>>> layer > >> >>>>>>>>>>>>>>>>>>>> around X. > >> >>>>>>>>>>>>>>>>>>>>>> So > >> >>>>>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that > >> >>>>>>>>>>>>>>> delegates to > >> >>>>>>>>>>>>>>>>> X in > >> >>>>>>>>>>>>>>>>>>>>> case > >> >>>>>>>>>>>>>>>>>>>>>>> of > >> >>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it > >> >> into > >> >>>> the > >> >>>>>>>>>>>>>>>>> operator > >> >>>>>>>>>>>>>>>>>>>>>> model > >> >>>>>>>>>>>>>>>>>>>>>>> as > >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably > >> >>>>>>>>>>>>>>> unnecessary > >> >>>>>>>>>>>>>>>>> in > >> >>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>> first step > >> >>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only > >> >>> receive > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>> requests > >> >>>>>>>>>>>>>>>>>>>>>>> after > >> >>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more > >> >>> interesting > >> >>>> to > >> >>>>>>>>>>>>>>> save > >> >>>>>>>>>>>>>>>>>>>>> memory). > >> >>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of > >> >> this > >> >>>> FLIP > >> >>>>>>>>>>>>>>>>> would be > >> >>>>>>>>>>>>>>>>>>>>>>> limited to > >> >>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces. > >> >>>> Everything > >> >>>>>>>>>>>>>>> else > >> >>>>>>>>>>>>>>>>>>>>> remains > >> >>>>>>>>>>>>>>>>>>>>>> an > >> >>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means > we > >> >> can > >> >>>>>>>>>>>>>>> easily > >> >>>>>>>>>>>>>>>>>>>>>> incorporate > >> >>>>>>>>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed > >> >> out > >> >>>>>>>>>>>>>>> later. > >> >>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture > is > >> >> not > >> >>>>>>>>>>>>>>> shared. > >> >>>>>>>>>>>>>>>>> I > >> >>>>>>>>>>>>>>>>>>>> don't > >> >>>>>>>>>>>>>>>>>>>>>>> know the > >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. > >> >>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр > >> >> Смирнов > >> >>> < > >> >>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > >> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >> >>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not > a > >> >>>>>>>>>>>>>>> committer > >> >>>>>>>>>>>>>>>>> yet, > >> >>>>>>>>>>>>>>>>>>>> but > >> >>>>>>>>>>>>>>>>>>>>>> I'd > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP > >> really > >> >>>>>>>>>>>>>>>>> interested > >> >>>>>>>>>>>>>>>>>>>> me. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature > in > >> >> my > >> >>>>>>>>>>>>>>>>> company’s > >> >>>>>>>>>>>>>>>>>>>>> Flink > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our > thoughts > >> >> on > >> >>>>>>>>>>>>>>> this and > >> >>>>>>>>>>>>>>>>>>>> make > >> >>>>>>>>>>>>>>>>>>>>>> code > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> open source. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than > >> >>>>>>>>>>>>>>> introducing an > >> >>>>>>>>>>>>>>>>>>>>> abstract > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction > >> (CachingTableFunction). > >> >>> As > >> >>>>>>>>>>>>>>> you > >> >>>>>>>>>>>>>>>>> know, > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the > flink-table-common > >> >>>>>>>>>>>>>>> module, > >> >>>>>>>>>>>>>>>>> which > >> >>>>>>>>>>>>>>>>>>>>>>> provides > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s > >> very > >> >>>>>>>>>>>>>>>>> convenient > >> >>>>>>>>>>>>>>>>>>>> for > >> >>>>>>>>>>>>>>>>>>>>>>> importing > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction > >> >>>> contains > >> >>>>>>>>>>>>>>>>> logic > >> >>>>>>>>>>>>>>>>>>>> for > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution, so this class and > >> >> everything > >> >>>>>>>>>>>>>>>>> connected > >> >>>>>>>>>>>>>>>>>>>> with > >> >>>>>>>>>>>>>>>>>>>>> it > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module, > probably > >> >> in > >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend > on > >> >>>> another > >> >>>>>>>>>>>>>>>>> module, > >> >>>>>>>>>>>>>>>>>>>>>> which > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which > doesn’t > >> >>>> sound > >> >>>>>>>>>>>>>>>>> good. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method > >> ‘getLookupConfig’ > >> >>> to > >> >>>>>>>>>>>>>>>>>>>>>> LookupTableSource > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors > >> to > >> >>>> only > >> >>>>>>>>>>>>>>> pass > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore > they > >> >>> won’t > >> >>>>>>>>>>>>>>>>> depend on > >> >>>>>>>>>>>>>>>>>>>>>>> runtime > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner > >> >> will > >> >>>>>>>>>>>>>>>>> construct a > >> >>>>>>>>>>>>>>>>>>>>>> lookup > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime > logic > >> >>>>>>>>>>>>>>>>>>>> (ProcessFunctions > >> >>>>>>>>>>>>>>>>>>>>>> in > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture > >> looks > >> >>>> like > >> >>>>>>>>>>>>>>> in > >> >>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>> pinned > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually > >> >> yours > >> >>>>>>>>>>>>>>>>>>>> CacheConfig). > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be > >> >>>>>>>>>>>>>>> responsible > >> >>>>>>>>>>>>>>>>> for > >> >>>>>>>>>>>>>>>>>>>>> this > >> >>>>>>>>>>>>>>>>>>>>>> – > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in > >> >>>>>>>>>>>>>>> flink-table-runtime > >> >>>>>>>>>>>>>>>>> - > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner, > >> >>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc, > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes > >> >> LookupJoinCachingRunner, > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful > advantage > >> >> of > >> >>>>>>>>>>>>>>> such a > >> >>>>>>>>>>>>>>>>>>>>> solution. > >> >>>>>>>>>>>>>>>>>>>>>>> If > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we > can > >> >>>> apply > >> >>>>>>>>>>>>>>> some > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it. > LookupJoinRunnerWithCalc > >> >> was > >> >>>>>>>>>>>>>>> named > >> >>>>>>>>>>>>>>>>> like > >> >>>>>>>>>>>>>>>>>>>>> this > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which > >> >>> actually > >> >>>>>>>>>>>>>>>>> mostly > >> >>>>>>>>>>>>>>>>>>>>>> consists > >> >>>>>>>>>>>>>>>>>>>>>>> of > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup > table > >> >> B > >> >>>>>>>>>>>>>>>>> condition > >> >>>>>>>>>>>>>>>>>>>>> ‘JOIN … > >> >>>>>>>>>>>>>>>>>>>>>>> ON > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE > >> >>> B.salary > > >> >>>>>>>>>>>>>>> 1000’ > >> >>>>>>>>>>>>>>>>>>>>> ‘calc’ > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age > + > >> >> 10 > >> >>>> and > >> >>>>>>>>>>>>>>>>>>>> B.salary > > >> >>>>>>>>>>>>>>>>>>>>>>> 1000. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing > >> >> records > >> >>> in > >> >>>>>>>>>>>>>>>>> cache, > >> >>>>>>>>>>>>>>>>>>>> size > >> >>>>>>>>>>>>>>>>>>>>> of > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: > filters = > >> >>>> avoid > >> >>>>>>>>>>>>>>>>> storing > >> >>>>>>>>>>>>>>>>>>>>>> useless > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce > >> records’ > >> >>>>>>>>>>>>>>> size. So > >> >>>>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>>>>>>> initial > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be > >> increased > >> >>> by > >> >>>>>>>>>>>>>>> the > >> >>>>>>>>>>>>>>>>> user. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it? > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote: > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs, > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion > >> >>> about > >> >>>>>>>>>>>>>>>>>>>> FLIP-221[1], > >> >>>>>>>>>>>>>>>>>>>>>>> which > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table > >> cache > >> >>> and > >> >>>>>>>>>>>>>>> its > >> >>>>>>>>>>>>>>>>>>>> standard > >> >>>>>>>>>>>>>>>>>>>>>>> metrics. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should > >> >>>> implement > >> >>>>>>>>>>>>>>>>> their > >> >>>>>>>>>>>>>>>>>>>> own > >> >>>>>>>>>>>>>>>>>>>>>>> cache to > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a > >> >> standard > >> >>> of > >> >>>>>>>>>>>>>>>>> metrics > >> >>>>>>>>>>>>>>>>>>>> for > >> >>>>>>>>>>>>>>>>>>>>>>> users and > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup > >> >>> joins, > >> >>>>>>>>>>>>>>> which > >> >>>>>>>>>>>>>>>>> is a > >> >>>>>>>>>>>>>>>>>>>>>> quite > >> >>>>>>>>>>>>>>>>>>>>>>> common > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL. > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs > including > >> >>>> cache, > >> >>>>>>>>>>>>>>>>>>>> metrics, > >> >>>>>>>>>>>>>>>>>>>>>>> wrapper > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table > >> options. > >> >>>>>>>>>>>>>>> Please > >> >>>>>>>>>>>>>>>>> take a > >> >>>>>>>>>>>>>>>>>>>>> look > >> >>>>>>>>>>>>>>>>>>>>>>> at the > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any > >> >>> suggestions > >> >>>>>>>>>>>>>>> and > >> >>>>>>>>>>>>>>>>>>>> comments > >> >>>>>>>>>>>>>>>>>>>>>>> would be > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated! > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >> >>>> > >> >>> > >> >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> -- > >> >>>>>>>>>>>>>>>>>>>>>>>> Best Regards, > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team > >> >>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud > >> >>>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com > >> >>>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>>>> -- > >> >>>>>>>>>>>>>>>>>>> Best regards, > >> >>>>>>>>>>>>>>>>>>> Roman Boyko > >> >>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > >> >>>>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>> > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> -- > >> >>>>>>>>>>>> Best Regards, > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Qingsheng Ren > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Real-time Computing Team > >> >>>>>>>>>>>> Alibaba Cloud > >> >>>>>>>>>>>> > >> >>>>>>>>>>>> Email: renqs...@gmail.com > >> >>>>>>>>>> > >> >>>> > >> >>>> > >> >>> > >> >> > >> > >> >