Hi Jing Ge, What do you mean about the "impact on the block cache used by HBase"? In my understanding, the connector cache and HBase cache are totally two things. The connector cache is a local/client cache, and the HBase cache is a server cache.
> does it make sense to have a no-cache solution as one of the default solutions so that customers will have no effort for the migration if they want to stick with Hbase cache The implementation migration should be transparent to users. Take the HBase connector as an example, it already supports lookup cache but is disabled by default. After migration, the connector still disables cache by default (i.e. no-cache solution). No migration effort for users. HBase cache and connector cache are two different things. HBase cache can't simply replace connector cache. Because one of the most important usages for connector cache is reducing the I/O request/response and improving the throughput, which can achieve by just using a server cache. Best, Jark On Fri, 27 May 2022 at 22:42, Jing Ge <j...@ververica.com> wrote: > Thanks all for the valuable discussion. The new feature looks very > interesting. > > According to the FLIP description: "*Currently we have JDBC, Hive and HBase > connector implemented lookup table source. All existing implementations > will be migrated to the current design and the migration will be > transparent to end users*." I was only wondering if we should pay attention > to HBase and similar DBs. Since, commonly, the lookup data will be huge > while using HBase, partial caching will be used in this case, if I am not > mistaken, which might have an impact on the block cache used by HBase, e.g. > LruBlockCache. > Another question is that, since HBase provides a sophisticated cache > solution, does it make sense to have a no-cache solution as one of the > default solutions so that customers will have no effort for the migration > if they want to stick with Hbase cache? > > Best regards, > Jing > > On Fri, May 27, 2022 at 11:19 AM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > Hi all, > > > > I think the problem now is below: > > 1. AllCache and PartialCache interface on the non-uniform, one needs to > > provide LookupProvider, the other needs to provide CacheBuilder. > > 2. AllCache definition is not flexible, for example, PartialCache can use > > any custom storage, while the AllCache can not, AllCache can also be > > considered to store memory or disk, also need a flexible strategy. > > 3. AllCache can not customize ReloadStrategy, currently only > > ScheduledReloadStrategy. > > > > In order to solve the above problems, the following are my ideas. > > > > ## Top level cache interfaces: > > > > ``` > > > > public interface CacheLookupProvider extends > > LookupTableSource.LookupRuntimeProvider { > > > > CacheBuilder createCacheBuilder(); > > } > > > > > > public interface CacheBuilder { > > Cache create(); > > } > > > > > > public interface Cache { > > > > /** > > * Returns the value associated with key in this cache, or null if > > there is no cached value for > > * key. > > */ > > @Nullable > > Collection<RowData> getIfPresent(RowData key); > > > > /** Returns the number of key-value mappings in the cache. */ > > long size(); > > } > > > > ``` > > > > ## Partial cache > > > > ``` > > > > public interface PartialCacheLookupFunction extends CacheLookupProvider { > > > > @Override > > PartialCacheBuilder createCacheBuilder(); > > > > /** Creates an {@link LookupFunction} instance. */ > > LookupFunction createLookupFunction(); > > } > > > > > > public interface PartialCacheBuilder extends CacheBuilder { > > > > PartialCache create(); > > } > > > > > > public interface PartialCache extends Cache { > > > > /** > > * Associates the specified value rows with the specified key row > > in the cache. If the cache > > * previously contained value associated with the key, the old > > value is replaced by the > > * specified value. > > * > > * @return the previous value rows associated with key, or null if > > there was no mapping for key. > > * @param key - key row with which the specified value is to be > > associated > > * @param value – value rows to be associated with the specified key > > */ > > Collection<RowData> put(RowData key, Collection<RowData> value); > > > > /** Discards any cached value for the specified key. */ > > void invalidate(RowData key); > > } > > > > ``` > > > > ## All cache > > ``` > > > > public interface AllCacheLookupProvider extends CacheLookupProvider { > > > > void registerReloadStrategy(ScheduledExecutorService > > executorService, Reloader reloader); > > > > ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); > > > > @Override > > AllCacheBuilder createCacheBuilder(); > > } > > > > > > public interface AllCacheBuilder extends CacheBuilder { > > > > AllCache create(); > > } > > > > > > public interface AllCache extends Cache { > > > > void putAll(Iterator<Map<RowData, RowData>> allEntries); > > > > void clearAll(); > > } > > > > > > public interface Reloader { > > > > void reload(); > > } > > > > ``` > > > > Best, > > Jingsong > > > > On Fri, May 27, 2022 at 11:10 AM Jingsong Li <jingsongl...@gmail.com> > > wrote: > > > > > Thanks Qingsheng and all for your discussion. > > > > > > Very sorry to jump in so late. > > > > > > Maybe I missed something? > > > My first impression when I saw the cache interface was, why don't we > > > provide an interface similar to guava cache [1], on top of guava cache, > > > caffeine also makes extensions for asynchronous calls.[2] > > > There is also the bulk load in caffeine too. > > > > > > I am also more confused why first from LookupCacheFactory.Builder and > > then > > > to Factory to create Cache. > > > > > > [1] https://github.com/google/guava > > > [2] https://github.com/ben-manes/caffeine/wiki/Population > > > > > > Best, > > > Jingsong > > > > > > On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com> wrote: > > > > > >> After looking at the new introduced ReloadTime and Becket's comment, > > >> I agree with Becket we should have a pluggable reloading strategy. > > >> We can provide some common implementations, e.g., periodic reloading, > > and > > >> daily reloading. > > >> But there definitely be some connector- or business-specific reloading > > >> strategies, e.g. > > >> notify by a zookeeper watcher, reload once a new Hive partition is > > >> complete. > > >> > > >> Best, > > >> Jark > > >> > > >> On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> > wrote: > > >> > > >> > Hi Qingsheng, > > >> > > > >> > Thanks for updating the FLIP. A few comments / questions below: > > >> > > > >> > 1. Is there a reason that we have both "XXXFactory" and > "XXXProvider". > > >> > What is the difference between them? If they are the same, can we > just > > >> use > > >> > XXXFactory everywhere? > > >> > > > >> > 2. Regarding the FullCachingLookupProvider, should the reloading > > policy > > >> > also be pluggable? Periodical reloading could be sometimes be tricky > > in > > >> > practice. For example, if user uses 24 hours as the cache refresh > > >> interval > > >> > and some nightly batch job delayed, the cache update may still see > the > > >> > stale data. > > >> > > > >> > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity > should > > be > > >> > removed. > > >> > > > >> > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a > > >> little > > >> > confusing to me. If Optional<LookupCacheFactory> getCacheFactory() > > >> returns > > >> > a non-empty factory, doesn't that already indicates the framework to > > >> cache > > >> > the missing keys? Also, why is this method returning an > > >> Optional<Boolean> > > >> > instead of boolean? > > >> > > > >> > Thanks, > > >> > > > >> > Jiangjie (Becket) Qin > > >> > > > >> > > > >> > > > >> > On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com> > > >> wrote: > > >> > > > >> >> Hi Lincoln and Jark, > > >> >> > > >> >> Thanks for the comments! If the community reaches a consensus that > we > > >> use > > >> >> SQL hint instead of table options to decide whether to use sync or > > >> async > > >> >> mode, it’s indeed not necessary to introduce the “lookup.async” > > option. > > >> >> > > >> >> I think it’s a good idea to let the decision of async made on query > > >> >> level, which could make better optimization with more infomation > > >> gathered > > >> >> by planner. Is there any FLIP describing the issue in FLINK-27625? > I > > >> >> thought FLIP-234 is only proposing adding SQL hint for retry on > > missing > > >> >> instead of the entire async mode to be controlled by hint. > > >> >> > > >> >> Best regards, > > >> >> > > >> >> Qingsheng > > >> >> > > >> >> > On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com> > > >> wrote: > > >> >> > > > >> >> > Hi Jark, > > >> >> > > > >> >> > Thanks for your reply! > > >> >> > > > >> >> > Currently 'lookup.async' just lies in HBase connector, I have no > > idea > > >> >> > whether or when to remove it (we can discuss it in another issue > > for > > >> the > > >> >> > HBase connector after FLINK-27625 is done), just not add it into > a > > >> >> common > > >> >> > option now. > > >> >> > > > >> >> > Best, > > >> >> > Lincoln Lee > > >> >> > > > >> >> > > > >> >> > Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道: > > >> >> > > > >> >> >> Hi Lincoln, > > >> >> >> > > >> >> >> I have taken a look at FLIP-234, and I agree with you that the > > >> >> connectors > > >> >> >> can > > >> >> >> provide both async and sync runtime providers simultaneously > > instead > > >> >> of one > > >> >> >> of them. > > >> >> >> At that point, "lookup.async" looks redundant. If this option is > > >> >> planned to > > >> >> >> be removed > > >> >> >> in the long term, I think it makes sense not to introduce it in > > this > > >> >> FLIP. > > >> >> >> > > >> >> >> Best, > > >> >> >> Jark > > >> >> >> > > >> >> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee < > lincoln.8...@gmail.com > > > > > >> >> wrote: > > >> >> >> > > >> >> >>> Hi Qingsheng, > > >> >> >>> > > >> >> >>> Sorry for jumping into the discussion so late. It's a good idea > > >> that > > >> >> we > > >> >> >> can > > >> >> >>> have a common table option. I have a minor comments on > > >> 'lookup.async' > > >> >> >> that > > >> >> >>> not make it a common option: > > >> >> >>> > > >> >> >>> The table layer abstracts both sync and async lookup > > capabilities, > > >> >> >>> connectors implementers can choose one or both, in the case of > > >> >> >> implementing > > >> >> >>> only one capability(status of the most of existing builtin > > >> connectors) > > >> >> >>> 'lookup.async' will not be used. And when a connector has both > > >> >> >>> capabilities, I think this choice is more suitable for making > > >> >> decisions > > >> >> >> at > > >> >> >>> the query level, for example, table planner can choose the > > physical > > >> >> >>> implementation of async lookup or sync lookup based on its cost > > >> >> model, or > > >> >> >>> users can give query hint based on their own better > > >> understanding. If > > >> >> >>> there is another common table option 'lookup.async', it may > > confuse > > >> >> the > > >> >> >>> users in the long run. > > >> >> >>> > > >> >> >>> So, I prefer to leave the 'lookup.async' option in private > place > > >> (for > > >> >> the > > >> >> >>> current hbase connector) and not turn it into a common option. > > >> >> >>> > > >> >> >>> WDYT? > > >> >> >>> > > >> >> >>> Best, > > >> >> >>> Lincoln Lee > > >> >> >>> > > >> >> >>> > > >> >> >>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道: > > >> >> >>> > > >> >> >>>> Hi Alexander, > > >> >> >>>> > > >> >> >>>> Thanks for the review! We recently updated the FLIP and you > can > > >> find > > >> >> >>> those > > >> >> >>>> changes from my latest email. Since some terminologies has > > >> changed so > > >> >> >>> I’ll > > >> >> >>>> use the new concept for replying your comments. > > >> >> >>>> > > >> >> >>>> 1. Builder vs ‘of’ > > >> >> >>>> I’m OK to use builder pattern if we have additional optional > > >> >> parameters > > >> >> >>>> for full caching mode (“rescan” previously). The > > >> schedule-with-delay > > >> >> >> idea > > >> >> >>>> looks reasonable to me, but I think we need to redesign the > > >> builder > > >> >> API > > >> >> >>> of > > >> >> >>>> full caching to make it more descriptive for developers. Would > > you > > >> >> mind > > >> >> >>>> sharing your ideas about the API? For accessing the FLIP > > workspace > > >> >> you > > >> >> >>> can > > >> >> >>>> just provide your account ID and ping any PMC member including > > >> Jark. > > >> >> >>>> > > >> >> >>>> 2. Common table options > > >> >> >>>> We have some discussions these days and propose to introduce 8 > > >> common > > >> >> >>>> table options about caching. It has been updated on the FLIP. > > >> >> >>>> > > >> >> >>>> 3. Retries > > >> >> >>>> I think we are on the same page :-) > > >> >> >>>> > > >> >> >>>> For your additional concerns: > > >> >> >>>> 1) The table option has been updated. > > >> >> >>>> 2) We got “lookup.cache” back for configuring whether to use > > >> partial > > >> >> or > > >> >> >>>> full caching mode. > > >> >> >>>> > > >> >> >>>> Best regards, > > >> >> >>>> > > >> >> >>>> Qingsheng > > >> >> >>>> > > >> >> >>>> > > >> >> >>>> > > >> >> >>>>> On May 19, 2022, at 17:25, Александр Смирнов < > > >> smirale...@gmail.com> > > >> >> >>>> wrote: > > >> >> >>>>> > > >> >> >>>>> Also I have a few additions: > > >> >> >>>>> 1) maybe rename 'lookup.cache.maximum-size' to > > >> >> >>>>> 'lookup.cache.max-rows'? I think it will be more clear that > we > > >> talk > > >> >> >>>>> not about bytes, but about the number of rows. Plus it fits > > more, > > >> >> >>>>> considering my optimization with filters. > > >> >> >>>>> 2) How will users enable rescanning? Are we going to separate > > >> >> caching > > >> >> >>>>> and rescanning from the options point of view? Like initially > > we > > >> had > > >> >> >>>>> one option 'lookup.cache' with values LRU / ALL. I think now > we > > >> can > > >> >> >>>>> make a boolean option 'lookup.rescan'. RescanInterval can be > > >> >> >>>>> 'lookup.rescan.interval', etc. > > >> >> >>>>> > > >> >> >>>>> Best regards, > > >> >> >>>>> Alexander > > >> >> >>>>> > > >> >> >>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов < > > >> smirale...@gmail.com > > >> >> >>> : > > >> >> >>>>>> > > >> >> >>>>>> Hi Qingsheng and Jark, > > >> >> >>>>>> > > >> >> >>>>>> 1. Builders vs 'of' > > >> >> >>>>>> I understand that builders are used when we have multiple > > >> >> >> parameters. > > >> >> >>>>>> I suggested them because we could add parameters later. To > > >> prevent > > >> >> >>>>>> Builder for ScanRuntimeProvider from looking redundant I can > > >> >> suggest > > >> >> >>>>>> one more config now - "rescanStartTime". > > >> >> >>>>>> It's a time in UTC (LocalTime class) when the first reload > of > > >> cache > > >> >> >>>>>> starts. This parameter can be thought of as 'initialDelay' > > (diff > > >> >> >>>>>> between current time and rescanStartTime) in method > > >> >> >>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can > be > > >> very > > >> >> >>>>>> useful when the dimension table is updated by some other > > >> scheduled > > >> >> >> job > > >> >> >>>>>> at a certain time. Or when the user simply wants a second > scan > > >> >> >> (first > > >> >> >>>>>> cache reload) be delayed. This option can be used even > without > > >> >> >>>>>> 'rescanInterval' - in this case 'rescanInterval' will be one > > >> day. > > >> >> >>>>>> If you are fine with this option, I would be very glad if > you > > >> would > > >> >> >>>>>> give me access to edit FLIP page, so I could add it myself > > >> >> >>>>>> > > >> >> >>>>>> 2. Common table options > > >> >> >>>>>> I also think that FactoryUtil would be overloaded by all > cache > > >> >> >>>>>> options. But maybe unify all suggested options, not only for > > >> >> default > > >> >> >>>>>> cache? I.e. class 'LookupOptions', that unifies default > cache > > >> >> >> options, > > >> >> >>>>>> rescan options, 'async', 'maxRetries'. WDYT? > > >> >> >>>>>> > > >> >> >>>>>> 3. Retries > > >> >> >>>>>> I'm fine with suggestion close to RetryUtils#tryTimes(times, > > >> call) > > >> >> >>>>>> > > >> >> >>>>>> [1] > > >> >> >>>> > > >> >> >>> > > >> >> >> > > >> >> > > >> > > > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- > > >> >> >>>>>> > > >> >> >>>>>> Best regards, > > >> >> >>>>>> Alexander > > >> >> >>>>>> > > >> >> >>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren < > renqs...@gmail.com > > >: > > >> >> >>>>>>> > > >> >> >>>>>>> Hi Jark and Alexander, > > >> >> >>>>>>> > > >> >> >>>>>>> Thanks for your comments! I’m also OK to introduce common > > table > > >> >> >>>> options. I prefer to introduce a new DefaultLookupCacheOptions > > >> class > > >> >> >> for > > >> >> >>>> holding these option definitions because putting all options > > into > > >> >> >>>> FactoryUtil would make it a bit ”crowded” and not well > > >> categorized. > > >> >> >>>>>>> > > >> >> >>>>>>> FLIP has been updated according to suggestions above: > > >> >> >>>>>>> 1. Use static “of” method for constructing > > >> RescanRuntimeProvider > > >> >> >>>> considering both arguments are required. > > >> >> >>>>>>> 2. Introduce new table options matching > > >> DefaultLookupCacheFactory > > >> >> >>>>>>> > > >> >> >>>>>>> Best, > > >> >> >>>>>>> Qingsheng > > >> >> >>>>>>> > > >> >> >>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> > > >> wrote: > > >> >> >>>>>>>> > > >> >> >>>>>>>> Hi Alex, > > >> >> >>>>>>>> > > >> >> >>>>>>>> 1) retry logic > > >> >> >>>>>>>> I think we can extract some common retry logic into > > utilities, > > >> >> >> e.g. > > >> >> >>>> RetryUtils#tryTimes(times, call). > > >> >> >>>>>>>> This seems independent of this FLIP and can be reused by > > >> >> >> DataStream > > >> >> >>>> users. > > >> >> >>>>>>>> Maybe we can open an issue to discuss this and where to > put > > >> it. > > >> >> >>>>>>>> > > >> >> >>>>>>>> 2) cache ConfigOptions > > >> >> >>>>>>>> I'm fine with defining cache config options in the > > framework. > > >> >> >>>>>>>> A candidate place to put is FactoryUtil which also > includes > > >> >> >>>> "sink.parallelism", "format" options. > > >> >> >>>>>>>> > > >> >> >>>>>>>> Best, > > >> >> >>>>>>>> Jark > > >> >> >>>>>>>> > > >> >> >>>>>>>> > > >> >> >>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов < > > >> >> >>> smirale...@gmail.com> > > >> >> >>>> wrote: > > >> >> >>>>>>>>> > > >> >> >>>>>>>>> Hi Qingsheng, > > >> >> >>>>>>>>> > > >> >> >>>>>>>>> Thank you for considering my comments. > > >> >> >>>>>>>>> > > >> >> >>>>>>>>>> there might be custom logic before making retry, such as > > >> >> >>>> re-establish the connection > > >> >> >>>>>>>>> > > >> >> >>>>>>>>> Yes, I understand that. I meant that such logic can be > > >> placed in > > >> >> >> a > > >> >> >>>>>>>>> separate function, that can be implemented by connectors. > > >> Just > > >> >> >>> moving > > >> >> >>>>>>>>> the retry logic would make connector's LookupFunction > more > > >> >> >> concise > > >> >> >>> + > > >> >> >>>>>>>>> avoid duplicate code. However, it's a minor change. The > > >> decision > > >> >> >> is > > >> >> >>>> up > > >> >> >>>>>>>>> to you. > > >> >> >>>>>>>>> > > >> >> >>>>>>>>>> We decide not to provide common DDL options and let > > >> developers > > >> >> >> to > > >> >> >>>> define their own options as we do now per connector. > > >> >> >>>>>>>>> > > >> >> >>>>>>>>> What is the reason for that? One of the main goals of > this > > >> FLIP > > >> >> >> was > > >> >> >>>> to > > >> >> >>>>>>>>> unify the configs, wasn't it? I understand that current > > cache > > >> >> >>> design > > >> >> >>>>>>>>> doesn't depend on ConfigOptions, like was before. But > still > > >> we > > >> >> >> can > > >> >> >>>> put > > >> >> >>>>>>>>> these options into the framework, so connectors can reuse > > >> them > > >> >> >> and > > >> >> >>>>>>>>> avoid code duplication, and, what is more significant, > > avoid > > >> >> >>> possible > > >> >> >>>>>>>>> different options naming. This moment can be pointed out > in > > >> >> >>>>>>>>> documentation for connector developers. > > >> >> >>>>>>>>> > > >> >> >>>>>>>>> Best regards, > > >> >> >>>>>>>>> Alexander > > >> >> >>>>>>>>> > > >> >> >>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren < > > >> renqs...@gmail.com>: > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>> Hi Alexander, > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>> Thanks for the review and glad to see we are on the same > > >> page! > > >> >> I > > >> >> >>>> think you forgot to cc the dev mailing list so I’m also > quoting > > >> your > > >> >> >>> reply > > >> >> >>>> under this email. > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>>> We can add 'maxRetryTimes' option into this class > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>> In my opinion the retry logic should be implemented in > > >> lookup() > > >> >> >>>> instead of in LookupFunction#eval(). Retrying is only > meaningful > > >> >> under > > >> >> >>> some > > >> >> >>>> specific retriable failures, and there might be custom logic > > >> before > > >> >> >>> making > > >> >> >>>> retry, such as re-establish the connection > > >> (JdbcRowDataLookupFunction > > >> >> >> is > > >> >> >>> an > > >> >> >>>> example), so it's more handy to leave it to the connector. > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>>> I don't see DDL options, that were in previous version > of > > >> >> FLIP. > > >> >> >>> Do > > >> >> >>>> you have any special plans for them? > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>> We decide not to provide common DDL options and let > > >> developers > > >> >> >> to > > >> >> >>>> define their own options as we do now per connector. > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>> The rest of comments sound great and I’ll update the > FLIP. > > >> Hope > > >> >> >> we > > >> >> >>>> can finalize our proposal soon! > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>> Best, > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>> Qingsheng > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>> > > >> >> >>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов < > > >> >> >>> smirale...@gmail.com> > > >> >> >>>> wrote: > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> Hi Qingsheng and devs! > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> I like the overall design of updated FLIP, however I > have > > >> >> >> several > > >> >> >>>>>>>>>>> suggestions and questions. > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> 1) Introducing LookupFunction as a subclass of > > >> TableFunction > > >> >> >> is a > > >> >> >>>> good > > >> >> >>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this > class. > > >> >> 'eval' > > >> >> >>>> method > > >> >> >>>>>>>>>>> of new LookupFunction is great for this purpose. The > same > > >> is > > >> >> >> for > > >> >> >>>>>>>>>>> 'async' case. > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> 2) There might be other configs in future, such as > > >> >> >>>> 'cacheMissingKey' > > >> >> >>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in > > >> >> >>>> ScanRuntimeProvider. > > >> >> >>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider and > > >> >> >>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one > > 'build' > > >> >> >>> method > > >> >> >>>>>>>>>>> instead of many 'of' methods in future)? > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> 3) What are the plans for existing > TableFunctionProvider > > >> and > > >> >> >>>>>>>>>>> AsyncTableFunctionProvider? I think they should be > > >> deprecated. > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> 4) Am I right that the current design does not assume > > >> usage of > > >> >> >>>>>>>>>>> user-provided LookupCache in re-scanning? In this case, > > it > > >> is > > >> >> >> not > > >> >> >>>> very > > >> >> >>>>>>>>>>> clear why do we need methods such as 'invalidate' or > > >> 'putAll' > > >> >> >> in > > >> >> >>>>>>>>>>> LookupCache. > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> 5) I don't see DDL options, that were in previous > version > > >> of > > >> >> >>> FLIP. > > >> >> >>>> Do > > >> >> >>>>>>>>>>> you have any special plans for them? > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> If you don't mind, I would be glad to be able to make > > small > > >> >> >>>>>>>>>>> adjustments to the FLIP document too. I think it's > worth > > >> >> >>> mentioning > > >> >> >>>>>>>>>>> about what exactly optimizations are planning in the > > >> future. > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>> Smirnov Alexander > > >> >> >>>>>>>>>>> > > >> >> >>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren < > > >> renqs...@gmail.com > > >> >> >>> : > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Hi Alexander and devs, > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Thank you very much for the in-depth discussion! As > Jark > > >> >> >>>> mentioned we were inspired by Alexander's idea and made a > > >> refactor on > > >> >> >> our > > >> >> >>>> design. FLIP-221 [1] has been updated to reflect our design > now > > >> and > > >> >> we > > >> >> >>> are > > >> >> >>>> happy to hear more suggestions from you! > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Compared to the previous design: > > >> >> >>>>>>>>>>>> 1. The lookup cache serves at table runtime level and > is > > >> >> >>>> integrated as a component of LookupJoinRunner as discussed > > >> >> previously. > > >> >> >>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect > the > > >> new > > >> >> >>>> design. > > >> >> >>>>>>>>>>>> 3. We separate the all-caching case individually and > > >> >> >> introduce a > > >> >> >>>> new RescanRuntimeProvider to reuse the ability of scanning. We > > are > > >> >> >>> planning > > >> >> >>>> to support SourceFunction / InputFormat for now considering > the > > >> >> >>> complexity > > >> >> >>>> of FLIP-27 Source API. > > >> >> >>>>>>>>>>>> 4. A new interface LookupFunction is introduced to > make > > >> the > > >> >> >>>> semantic of lookup more straightforward for developers. > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> For replying to Alexander: > > >> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is > > >> >> >> deprecated > > >> >> >>>> or not. Am I right that it will be so in the future, but > > currently > > >> >> it's > > >> >> >>> not? > > >> >> >>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for > > now. > > >> I > > >> >> >>> think > > >> >> >>>> it will be deprecated in the future but we don't have a clear > > plan > > >> >> for > > >> >> >>> that. > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Thanks again for the discussion on this FLIP and > looking > > >> >> >> forward > > >> >> >>>> to cooperating with you after we finalize the design and > > >> interfaces! > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> [1] > > >> >> >>>> > > >> >> >>> > > >> >> >> > > >> >> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Qingsheng > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов < > > >> >> >>>> smirale...@gmail.com> wrote: > > >> >> >>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard! > > >> >> >>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>> Glad to see that we came to a consensus on almost all > > >> >> points! > > >> >> >>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is > > >> >> >> deprecated > > >> >> >>>> or > > >> >> >>>>>>>>>>>>> not. Am I right that it will be so in the future, but > > >> >> >> currently > > >> >> >>>> it's > > >> >> >>>>>>>>>>>>> not? Actually I also think that for the first version > > >> it's > > >> >> OK > > >> >> >>> to > > >> >> >>>> use > > >> >> >>>>>>>>>>>>> InputFormat in ALL cache realization, because > > supporting > > >> >> >> rescan > > >> >> >>>>>>>>>>>>> ability seems like a very distant prospect. But for > > this > > >> >> >>>> decision we > > >> >> >>>>>>>>>>>>> need a consensus among all discussion participants. > > >> >> >>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>> In general, I don't have something to argue with your > > >> >> >>>> statements. All > > >> >> >>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it would > be > > >> nice > > >> >> >> to > > >> >> >>>> work > > >> >> >>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot > of > > >> work > > >> >> >> on > > >> >> >>>> lookup > > >> >> >>>>>>>>>>>>> join caching with realization very close to the one > we > > >> are > > >> >> >>>> discussing, > > >> >> >>>>>>>>>>>>> and want to share the results of this work. Anyway > > >> looking > > >> >> >>>> forward for > > >> >> >>>>>>>>>>>>> the FLIP update! > > >> >> >>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>> Smirnov Alexander > > >> >> >>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu < > imj...@gmail.com > > >: > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> Hi Alex, > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> Thanks for summarizing your points. > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have > > >> discussed > > >> >> >> it > > >> >> >>>> several times > > >> >> >>>>>>>>>>>>>> and we have totally refactored the design. > > >> >> >>>>>>>>>>>>>> I'm glad to say we have reached a consensus on many > of > > >> your > > >> >> >>>> points! > > >> >> >>>>>>>>>>>>>> Qingsheng is still working on updating the design > docs > > >> and > > >> >> >>>> maybe can be > > >> >> >>>>>>>>>>>>>> available in the next few days. > > >> >> >>>>>>>>>>>>>> I will share some conclusions from our discussions: > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> 1) we have refactored the design towards to "cache > in > > >> >> >>>> framework" way. > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> 2) a "LookupCache" interface for users to customize > > and > > >> a > > >> >> >>>> default > > >> >> >>>>>>>>>>>>>> implementation with builder for users to easy-use. > > >> >> >>>>>>>>>>>>>> This can both make it possible to both have > > flexibility > > >> and > > >> >> >>>> conciseness. > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU > lookup > > >> >> >> cache, > > >> >> >>>> esp reducing > > >> >> >>>>>>>>>>>>>> IO. > > >> >> >>>>>>>>>>>>>> Filter pushdown should be the final state and the > > >> unified > > >> >> >> way > > >> >> >>>> to both > > >> >> >>>>>>>>>>>>>> support pruning ALL cache and LRU cache, > > >> >> >>>>>>>>>>>>>> so I think we should make effort in this direction. > If > > >> we > > >> >> >> need > > >> >> >>>> to support > > >> >> >>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use > > >> >> >>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide > to > > >> >> >>> implement > > >> >> >>>> the cache > > >> >> >>>>>>>>>>>>>> in the framework, we have the chance to support > > >> >> >>>>>>>>>>>>>> filter on cache anytime. This is an optimization and > > it > > >> >> >>> doesn't > > >> >> >>>> affect the > > >> >> >>>>>>>>>>>>>> public API. I think we can create a JIRA issue to > > >> >> >>>>>>>>>>>>>> discuss it when the FLIP is accepted. > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to your > > >> >> >> proposal. > > >> >> >>>>>>>>>>>>>> In the first version, we will only support > > InputFormat, > > >> >> >>>> SourceFunction for > > >> >> >>>>>>>>>>>>>> cache all (invoke InputFormat in join operator). > > >> >> >>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source > > >> operator > > >> >> >>>> instead of > > >> >> >>>>>>>>>>>>>> calling it embedded in the join operator. > > >> >> >>>>>>>>>>>>>> However, this needs another FLIP to support the > > re-scan > > >> >> >>> ability > > >> >> >>>> for FLIP-27 > > >> >> >>>>>>>>>>>>>> Source, and this can be a large work. > > >> >> >>>>>>>>>>>>>> In order to not block this issue, we can put the > > effort > > >> of > > >> >> >>>> FLIP-27 source > > >> >> >>>>>>>>>>>>>> integration into future work and integrate > > >> >> >>>>>>>>>>>>>> InputFormat&SourceFunction for now. > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> I think it's fine to use InputFormat&SourceFunction, > > as > > >> >> they > > >> >> >>>> are not > > >> >> >>>>>>>>>>>>>> deprecated, otherwise, we have to introduce another > > >> >> function > > >> >> >>>>>>>>>>>>>> similar to them which is meaningless. We need to > plan > > >> >> >> FLIP-27 > > >> >> >>>> source > > >> >> >>>>>>>>>>>>>> integration ASAP before InputFormat & SourceFunction > > are > > >> >> >>>> deprecated. > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> Best, > > >> >> >>>>>>>>>>>>>> Jark > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов < > > >> >> >>>> smirale...@gmail.com> > > >> >> >>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> Hi Martijn! > > >> >> >>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> Got it. Therefore, the realization with InputFormat > > is > > >> not > > >> >> >>>> considered. > > >> >> >>>>>>>>>>>>>>> Thanks for clearing that up! > > >> >> >>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>> Smirnov Alexander > > >> >> >>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser < > > >> >> >>>> mart...@ververica.com>: > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>> Hi, > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>> With regards to: > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> But if there are plans to refactor all connectors > > to > > >> >> >>> FLIP-27 > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The > > old > > >> >> >>>> interfaces will be > > >> >> >>>>>>>>>>>>>>>> deprecated and connectors will either be > refactored > > to > > >> >> use > > >> >> >>>> the new ones > > >> >> >>>>>>>>>>>>>>> or > > >> >> >>>>>>>>>>>>>>>> dropped. > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>> The caching should work for connectors that are > > using > > >> >> >>> FLIP-27 > > >> >> >>>> interfaces, > > >> >> >>>>>>>>>>>>>>>> we should not introduce new features for old > > >> interfaces. > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>> Martijn > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов < > > >> >> >>>> smirale...@gmail.com> > > >> >> >>>>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> Hi Jark! > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> Sorry for the late response. I would like to make > > >> some > > >> >> >>>> comments and > > >> >> >>>>>>>>>>>>>>>>> clarify my points. > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think we > > can > > >> >> >>> achieve > > >> >> >>>> both > > >> >> >>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in > > >> >> >>>> flink-table-common, > > >> >> >>>>>>>>>>>>>>>>> but have implementations of it in > > >> flink-table-runtime. > > >> >> >>>> Therefore if a > > >> >> >>>>>>>>>>>>>>>>> connector developer wants to use existing cache > > >> >> >> strategies > > >> >> >>>> and their > > >> >> >>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig to > > the > > >> >> >>>> planner, but if > > >> >> >>>>>>>>>>>>>>>>> he wants to have its own cache implementation in > > his > > >> >> >>>> TableFunction, it > > >> >> >>>>>>>>>>>>>>>>> will be possible for him to use the existing > > >> interface > > >> >> >> for > > >> >> >>>> this > > >> >> >>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in the > > >> >> >>>> documentation). In > > >> >> >>>>>>>>>>>>>>>>> this way all configs and metrics will be unified. > > >> WDYT? > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, > we > > >> will > > >> >> >>>> have 90% of > > >> >> >>>>>>>>>>>>>>>>> lookup requests that can never be cached > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters optimization > in > > >> case > > >> >> >> of > > >> >> >>>> LRU cache. > > >> >> >>>>>>>>>>>>>>>>> It looks like Cache<RowData, > Collection<RowData>>. > > >> Here > > >> >> >> we > > >> >> >>>> always > > >> >> >>>>>>>>>>>>>>>>> store the response of the dimension table in > cache, > > >> even > > >> >> >>>> after > > >> >> >>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no rows > > >> after > > >> >> >>>> applying > > >> >> >>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of > > >> >> >>> TableFunction, > > >> >> >>>> we store > > >> >> >>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the > cache > > >> line > > >> >> >>> will > > >> >> >>>> be > > >> >> >>>>>>>>>>>>>>>>> filled, but will require much less memory (in > > bytes). > > >> >> >> I.e. > > >> >> >>>> we don't > > >> >> >>>>>>>>>>>>>>>>> completely filter keys, by which result was > pruned, > > >> but > > >> >> >>>> significantly > > >> >> >>>>>>>>>>>>>>>>> reduce required memory to store this result. If > the > > >> user > > >> >> >>>> knows about > > >> >> >>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' > > option > > >> >> >> before > > >> >> >>>> the start > > >> >> >>>>>>>>>>>>>>>>> of the job. But actually I came up with the idea > > >> that we > > >> >> >>> can > > >> >> >>>> do this > > >> >> >>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and > > >> 'weigher' > > >> >> >>>> methods of > > >> >> >>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the > > >> collection > > >> >> >> of > > >> >> >>>> rows > > >> >> >>>>>>>>>>>>>>>>> (value of cache). Therefore cache can > automatically > > >> fit > > >> >> >>> much > > >> >> >>>> more > > >> >> >>>>>>>>>>>>>>>>> records than before. > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do > > filters > > >> and > > >> >> >>>> projects > > >> >> >>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > > >> >> >>>> SupportsProjectionPushDown. > > >> >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the > > interfaces, > > >> >> >> don't > > >> >> >>>> mean it's > > >> >> >>>>>>>>>>>>>>> hard > > >> >> >>>>>>>>>>>>>>>>> to implement. > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> It's debatable how difficult it will be to > > implement > > >> >> >> filter > > >> >> >>>> pushdown. > > >> >> >>>>>>>>>>>>>>>>> But I think the fact that currently there is no > > >> database > > >> >> >>>> connector > > >> >> >>>>>>>>>>>>>>>>> with filter pushdown at least means that this > > feature > > >> >> >> won't > > >> >> >>>> be > > >> >> >>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we > talk > > >> about > > >> >> >>>> other > > >> >> >>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases > > might > > >> >> not > > >> >> >>>> support all > > >> >> >>>>>>>>>>>>>>>>> Flink filters (or not support filters at all). I > > >> think > > >> >> >>> users > > >> >> >>>> are > > >> >> >>>>>>>>>>>>>>>>> interested in supporting cache filters > optimization > > >> >> >>>> independently of > > >> >> >>>>>>>>>>>>>>>>> supporting other features and solving more > complex > > >> >> >> problems > > >> >> >>>> (or > > >> >> >>>>>>>>>>>>>>>>> unsolvable at all). > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually in > > our > > >> >> >>>> internal version > > >> >> >>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and > > >> >> reloading > > >> >> >>>> data from > > >> >> >>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a > way > > to > > >> >> >> unify > > >> >> >>>> the logic > > >> >> >>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, > > >> >> SourceFunction, > > >> >> >>>> Source,...) > > >> >> >>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result > I > > >> >> >> settled > > >> >> >>>> on using > > >> >> >>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning in > > all > > >> >> >> lookup > > >> >> >>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans > to > > >> >> >>> deprecate > > >> >> >>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO > usage > > of > > >> >> >>>> FLIP-27 source > > >> >> >>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this > > source > > >> was > > >> >> >>>> designed to > > >> >> >>>>>>>>>>>>>>>>> work in distributed environment (SplitEnumerator > on > > >> >> >>>> JobManager and > > >> >> >>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one > operator > > >> >> >> (lookup > > >> >> >>>> join > > >> >> >>>>>>>>>>>>>>>>> operator in our case). There is even no direct > way > > to > > >> >> >> pass > > >> >> >>>> splits from > > >> >> >>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works > > >> >> through > > >> >> >>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires > > >> >> >>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send > > >> >> >> AddSplitEvents). > > >> >> >>>> Usage of > > >> >> >>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer > > and > > >> >> >>>> easier. But if > > >> >> >>>>>>>>>>>>>>>>> there are plans to refactor all connectors to > > >> FLIP-27, I > > >> >> >>>> have the > > >> >> >>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from lookup > > join > > >> >> ALL > > >> >> >>>> cache in > > >> >> >>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of > > batch > > >> >> >>> source? > > >> >> >>>> The point > > >> >> >>>>>>>>>>>>>>>>> is that the only difference between lookup join > ALL > > >> >> cache > > >> >> >>>> and simple > > >> >> >>>>>>>>>>>>>>>>> join with batch source is that in the first case > > >> >> scanning > > >> >> >>> is > > >> >> >>>> performed > > >> >> >>>>>>>>>>>>>>>>> multiple times, in between which state (cache) is > > >> >> cleared > > >> >> >>>> (correct me > > >> >> >>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the > > >> functionality of > > >> >> >>>> simple join > > >> >> >>>>>>>>>>>>>>>>> to support state reloading + extend the > > >> functionality of > > >> >> >>>> scanning > > >> >> >>>>>>>>>>>>>>>>> batch source multiple times (this one should be > > easy > > >> >> with > > >> >> >>>> new FLIP-27 > > >> >> >>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading - we > > >> will > > >> >> >> need > > >> >> >>>> to change > > >> >> >>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits > again > > >> after > > >> >> >>>> some TTL). > > >> >> >>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a long-term > > >> goal > > >> >> >> and > > >> >> >>>> will make > > >> >> >>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you said. > > >> Maybe > > >> >> >> we > > >> >> >>>> can limit > > >> >> >>>>>>>>>>>>>>>>> ourselves to a simpler solution now > (InputFormats). > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> So to sum up, my points is like this: > > >> >> >>>>>>>>>>>>>>>>> 1) There is a way to make both concise and > flexible > > >> >> >>>> interfaces for > > >> >> >>>>>>>>>>>>>>>>> caching in lookup join. > > >> >> >>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both > in > > >> LRU > > >> >> >> and > > >> >> >>>> ALL caches. > > >> >> >>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be > > >> supported > > >> >> >> in > > >> >> >>>> Flink > > >> >> >>>>>>>>>>>>>>>>> connectors, some of the connectors might not have > > the > > >> >> >>>> opportunity to > > >> >> >>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently > > filter > > >> >> >>>> pushdown works > > >> >> >>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache filters > + > > >> >> >>>> projections > > >> >> >>>>>>>>>>>>>>>>> optimization should be independent from other > > >> features. > > >> >> >>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic that > > >> >> involves > > >> >> >>>> multiple > > >> >> >>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing from > > >> >> >>>> InputFormat in favor > > >> >> >>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization > > >> really > > >> >> >>>> complex and > > >> >> >>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can extend > > the > > >> >> >>>> functionality of > > >> >> >>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in > case > > of > > >> >> >>> lookup > > >> >> >>>> join ALL > > >> >> >>>>>>>>>>>>>>>>> cache? > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>>> Smirnov Alexander > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> [1] > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> > > >> >> >>>> > > >> >> >>> > > >> >> >> > > >> >> > > >> > > > https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu < > > imj...@gmail.com > > >> >: > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> It's great to see the active discussion! I want > to > > >> >> share > > >> >> >>> my > > >> >> >>>> ideas: > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. > connectors > > >> base > > >> >> >>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways > > >> should > > >> >> >>>> work (e.g., > > >> >> >>>>>>>>>>>>>>> cache > > >> >> >>>>>>>>>>>>>>>>>> pruning, compatibility). > > >> >> >>>>>>>>>>>>>>>>>> The framework way can provide more concise > > >> interfaces. > > >> >> >>>>>>>>>>>>>>>>>> The connector base way can define more flexible > > >> cache > > >> >> >>>>>>>>>>>>>>>>>> strategies/implementations. > > >> >> >>>>>>>>>>>>>>>>>> We are still investigating a way to see if we > can > > >> have > > >> >> >>> both > > >> >> >>>>>>>>>>>>>>> advantages. > > >> >> >>>>>>>>>>>>>>>>>> We should reach a consensus that the way should > > be a > > >> >> >> final > > >> >> >>>> state, > > >> >> >>>>>>>>>>>>>>> and we > > >> >> >>>>>>>>>>>>>>>>>> are on the path to it. > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> 2) filters and projections pushdown: > > >> >> >>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown into > > >> cache > > >> >> >> can > > >> >> >>>> benefit a > > >> >> >>>>>>>>>>>>>>> lot > > >> >> >>>>>>>>>>>>>>>>> for > > >> >> >>>>>>>>>>>>>>>>>> ALL cache. > > >> >> >>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. > > Connectors > > >> use > > >> >> >>>> cache to > > >> >> >>>>>>>>>>>>>>> reduce > > >> >> >>>>>>>>>>>>>>>>> IO > > >> >> >>>>>>>>>>>>>>>>>> requests to databases for better throughput. > > >> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, > we > > >> will > > >> >> >>>> have 90% of > > >> >> >>>>>>>>>>>>>>>>> lookup > > >> >> >>>>>>>>>>>>>>>>>> requests that can never be cached > > >> >> >>>>>>>>>>>>>>>>>> and hit directly to the databases. That means > the > > >> cache > > >> >> >> is > > >> >> >>>>>>>>>>>>>>> meaningless in > > >> >> >>>>>>>>>>>>>>>>>> this case. > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do > > >> >> filters > > >> >> >>>> and projects > > >> >> >>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > > >> >> >>>>>>>>>>>>>>> SupportsProjectionPushDown. > > >> >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the > > interfaces, > > >> >> >> don't > > >> >> >>>> mean it's > > >> >> >>>>>>>>>>>>>>> hard > > >> >> >>>>>>>>>>>>>>>>> to > > >> >> >>>>>>>>>>>>>>>>>> implement. > > >> >> >>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces to > > >> reduce > > >> >> >> IO > > >> >> >>>> and the > > >> >> >>>>>>>>>>>>>>> cache > > >> >> >>>>>>>>>>>>>>>>>> size. > > >> >> >>>>>>>>>>>>>>>>>> That should be a final state that the scan > source > > >> and > > >> >> >>>> lookup source > > >> >> >>>>>>>>>>>>>>> share > > >> >> >>>>>>>>>>>>>>>>>> the exact pushdown implementation. > > >> >> >>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the > pushdown > > >> logic > > >> >> >> in > > >> >> >>>> caches, > > >> >> >>>>>>>>>>>>>>> which > > >> >> >>>>>>>>>>>>>>>>>> will complex the lookup join design. > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> 3) ALL cache abstraction > > >> >> >>>>>>>>>>>>>>>>>> All cache might be the most challenging part of > > this > > >> >> >> FLIP. > > >> >> >>>> We have > > >> >> >>>>>>>>>>>>>>> never > > >> >> >>>>>>>>>>>>>>>>>> provided a reload-lookup public interface. > > >> >> >>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the "eval" > > >> method > > >> >> >> of > > >> >> >>>>>>>>>>>>>>> TableFunction. > > >> >> >>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive). > > >> >> >>>>>>>>>>>>>>>>>> Ideally, connector implementation should share > the > > >> >> logic > > >> >> >>> of > > >> >> >>>> reload > > >> >> >>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with > > >> >> >>>> InputFormat/SourceFunction/FLIP-27 > > >> >> >>>>>>>>>>>>>>>>> Source. > > >> >> >>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are > > deprecated, > > >> and > > >> >> >>> the > > >> >> >>>> FLIP-27 > > >> >> >>>>>>>>>>>>>>>>> source > > >> >> >>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator. > > >> >> >>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in > > >> LookupJoin, > > >> >> >>> this > > >> >> >>>> may make > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>> scope of this FLIP much larger. > > >> >> >>>>>>>>>>>>>>>>>> We are still investigating how to abstract the > ALL > > >> >> cache > > >> >> >>>> logic and > > >> >> >>>>>>>>>>>>>>> reuse > > >> >> >>>>>>>>>>>>>>>>>> the existing source interfaces. > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> Best, > > >> >> >>>>>>>>>>>>>>>>>> Jark > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko < > > >> >> >>>> ro.v.bo...@gmail.com> > > >> >> >>>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>> It's a much more complicated activity and lies > > out > > >> of > > >> >> >> the > > >> >> >>>> scope of > > >> >> >>>>>>>>>>>>>>> this > > >> >> >>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be > > done > > >> for > > >> >> >>> all > > >> >> >>>>>>>>>>>>>>>>> ScanTableSource > > >> >> >>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones). > > >> >> >>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser < > > >> >> >>>>>>>>>>>>>>> martijnvis...@apache.org> > > >> >> >>>>>>>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> Hi everyone, > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander > correctly > > >> >> >>> mentioned > > >> >> >>>> that > > >> >> >>>>>>>>>>>>>>> filter > > >> >> >>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for > > >> >> >> jdbc/hive/hbase." > > >> >> >>>> -> Would > > >> >> >>>>>>>>>>>>>>> an > > >> >> >>>>>>>>>>>>>>>>>>>> alternative solution be to actually implement > > >> these > > >> >> >>> filter > > >> >> >>>>>>>>>>>>>>> pushdowns? > > >> >> >>>>>>>>>>>>>>>>> I > > >> >> >>>>>>>>>>>>>>>>>>>> can > > >> >> >>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to > > doing > > >> >> >> that, > > >> >> >>>> outside > > >> >> >>>>>>>>>>>>>>> of > > >> >> >>>>>>>>>>>>>>>>> lookup > > >> >> >>>>>>>>>>>>>>>>>>>> caching and metrics. > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> Martijn Visser > > >> >> >>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82 > > >> >> >>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < > > >> >> >>>> ro.v.bo...@gmail.com> > > >> >> >>>>>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> Hi everyone! > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable > improvement! > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation > > >> would be > > >> >> >> a > > >> >> >>>> nice > > >> >> >>>>>>>>>>>>>>>>> opportunity > > >> >> >>>>>>>>>>>>>>>>>>>> for > > >> >> >>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME > > AS > > >> OF > > >> >> >>>> proc_time" > > >> >> >>>>>>>>>>>>>>>>> semantics > > >> >> >>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be > > >> implemented. > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say > > >> that: > > >> >> >>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to > > cut > > >> off > > >> >> >>> the > > >> >> >>>> cache > > >> >> >>>>>>>>>>>>>>> size > > >> >> >>>>>>>>>>>>>>>>> by > > >> >> >>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the > most > > >> >> handy > > >> >> >>>> way to do > > >> >> >>>>>>>>>>>>>>> it > > >> >> >>>>>>>>>>>>>>>>> is > > >> >> >>>>>>>>>>>>>>>>>>>> apply > > >> >> >>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit > > >> harder to > > >> >> >>>> pass it > > >> >> >>>>>>>>>>>>>>>>> through the > > >> >> >>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And > Alexander > > >> >> >>> correctly > > >> >> >>>>>>>>>>>>>>> mentioned > > >> >> >>>>>>>>>>>>>>>>> that > > >> >> >>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented for > > >> >> >>>> jdbc/hive/hbase. > > >> >> >>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching > > >> >> >> parameters > > >> >> >>>> for > > >> >> >>>>>>>>>>>>>>> different > > >> >> >>>>>>>>>>>>>>>>>>>> tables > > >> >> >>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to set > it > > >> >> >> through > > >> >> >>>> DDL > > >> >> >>>>>>>>>>>>>>> rather > > >> >> >>>>>>>>>>>>>>>>> than > > >> >> >>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other options > > for > > >> >> all > > >> >> >>>> lookup > > >> >> >>>>>>>>>>>>>>> tables. > > >> >> >>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework > > really > > >> >> >>>> deprives us of > > >> >> >>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to > implement > > >> >> their > > >> >> >>> own > > >> >> >>>>>>>>>>>>>>> cache). > > >> >> >>>>>>>>>>>>>>>>> But > > >> >> >>>>>>>>>>>>>>>>>>>> most > > >> >> >>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating more > > >> >> >> different > > >> >> >>>> cache > > >> >> >>>>>>>>>>>>>>>>> strategies > > >> >> >>>>>>>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>>>>>> a wider set of configurations. > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> All these points are much closer to the > schema > > >> >> >> proposed > > >> >> >>>> by > > >> >> >>>>>>>>>>>>>>>>> Alexander. > > >> >> >>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not > > right > > >> and > > >> >> >>> all > > >> >> >>>> these > > >> >> >>>>>>>>>>>>>>>>>>>> facilities > > >> >> >>>>>>>>>>>>>>>>>>>>> might be simply implemented in your > > architecture? > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>>>>>>> Roman Boyko > > >> >> >>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser < > > >> >> >>>>>>>>>>>>>>>>> martijnvis...@apache.org> > > >> >> >>>>>>>>>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hi everyone, > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just > wanted > > to > > >> >> >>>> express that > > >> >> >>>>>>>>>>>>>>> I > > >> >> >>>>>>>>>>>>>>>>> really > > >> >> >>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this > > topic > > >> >> >> and I > > >> >> >>>> hope > > >> >> >>>>>>>>>>>>>>> that > > >> >> >>>>>>>>>>>>>>>>>>>> others > > >> >> >>>>>>>>>>>>>>>>>>>>>> will join the conversation. > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>> Martijn > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр > > Смирнов < > > >> >> >>>>>>>>>>>>>>>>> smirale...@gmail.com> > > >> >> >>>>>>>>>>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! > However, I > > >> have > > >> >> >>>> questions > > >> >> >>>>>>>>>>>>>>>>> about > > >> >> >>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get > > >> >> >>>> something?). > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of > "FOR > > >> >> >>>> SYSTEM_TIME > > >> >> >>>>>>>>>>>>>>> AS OF > > >> >> >>>>>>>>>>>>>>>>>>>>>> proc_time” > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR > > SYSTEM_TIME > > >> AS > > >> >> >> OF > > >> >> >>>>>>>>>>>>>>> proc_time" > > >> >> >>>>>>>>>>>>>>>>> is > > >> >> >>>>>>>>>>>>>>>>>>>> not > > >> >> >>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you > > >> said, > > >> >> >>> users > > >> >> >>>> go > > >> >> >>>>>>>>>>>>>>> on it > > >> >> >>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance > (no > > >> one > > >> >> >>>> proposed > > >> >> >>>>>>>>>>>>>>> to > > >> >> >>>>>>>>>>>>>>>>> enable > > >> >> >>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do > you > > >> mean > > >> >> >>>> other > > >> >> >>>>>>>>>>>>>>>>> developers > > >> >> >>>>>>>>>>>>>>>>>>>> of > > >> >> >>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers > > explicitly > > >> >> >>> specify > > >> >> >>>>>>>>>>>>>>> whether > > >> >> >>>>>>>>>>>>>>>>> their > > >> >> >>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the > > list > > >> of > > >> >> >>>> supported > > >> >> >>>>>>>>>>>>>>>>>>>> options), > > >> >> >>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't > want > > >> to. > > >> >> So > > >> >> >>>> what > > >> >> >>>>>>>>>>>>>>>>> exactly is > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the difference between implementing caching > > in > > >> >> >>> modules > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in > flink-table-common > > >> from > > >> >> >>> the > > >> >> >>>>>>>>>>>>>>>>> considered > > >> >> >>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on > > >> >> >>>> breaking/non-breaking > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF > > proc_time"? > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table > > >> options in > > >> >> >>> DDL > > >> >> >>>> to > > >> >> >>>>>>>>>>>>>>>>> control > > >> >> >>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has never > > >> >> happened > > >> >> >>>>>>>>>>>>>>> previously > > >> >> >>>>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>>>>>> should > > >> >> >>>>>>>>>>>>>>>>>>>>>>> be cautious > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of > > semantics > > >> of > > >> >> >> DDL > > >> >> >>>>>>>>>>>>>>> options > > >> >> >>>>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it > > >> about > > >> >> >>>> limiting > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>> scope > > >> >> >>>>>>>>>>>>>>>>>>>> of > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user > > business > > >> >> >> logic > > >> >> >>>> rather > > >> >> >>>>>>>>>>>>>>> than > > >> >> >>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic in > > the > > >> >> >>>> framework? I > > >> >> >>>>>>>>>>>>>>>>> mean > > >> >> >>>>>>>>>>>>>>>>>>>> that > > >> >> >>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an > option > > >> with > > >> >> >>>> lookup > > >> >> >>>>>>>>>>>>>>> cache > > >> >> >>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would be the > > wrong > > >> >> >>>> decision, > > >> >> >>>>>>>>>>>>>>>>> because it > > >> >> >>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business logic > > (not > > >> >> >> just > > >> >> >>>>>>>>>>>>>>> performance > > >> >> >>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several > > functions > > >> of > > >> >> >> ONE > > >> >> >>>> table > > >> >> >>>>>>>>>>>>>>>>> (there > > >> >> >>>>>>>>>>>>>>>>>>>> can > > >> >> >>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different caches). > > >> Does it > > >> >> >>>> really > > >> >> >>>>>>>>>>>>>>>>> matter for > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic > is > > >> >> >>> located, > > >> >> >>>>>>>>>>>>>>> which is > > >> >> >>>>>>>>>>>>>>>>>>>>>>> affected by the applied option? > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option > > >> 'sink.parallelism', > > >> >> >>>> which in > > >> >> >>>>>>>>>>>>>>>>> some way > > >> >> >>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" > and > > I > > >> >> >> don't > > >> >> >>>> see any > > >> >> >>>>>>>>>>>>>>>>> problem > > >> >> >>>>>>>>>>>>>>>>>>>>>>> here. > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this > > all-caching > > >> >> >>>> scenario > > >> >> >>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>> design > > >> >> >>>>>>>>>>>>>>>>>>>>>>> would become more complex > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate > discussion, > > >> but > > >> >> >>>> actually > > >> >> >>>>>>>>>>>>>>> in our > > >> >> >>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem > quite > > >> >> >> easily > > >> >> >>> - > > >> >> >>>> we > > >> >> >>>>>>>>>>>>>>> reused > > >> >> >>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for > a > > >> new > > >> >> >>> API). > > >> >> >>>> The > > >> >> >>>>>>>>>>>>>>>>> point is > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use > > >> >> >> InputFormat > > >> >> >>>> for > > >> >> >>>>>>>>>>>>>>>>> scanning > > >> >> >>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even > Hive > > >> - it > > >> >> >>> uses > > >> >> >>>>>>>>>>>>>>> class > > >> >> >>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a > > >> wrapper > > >> >> >>> around > > >> >> >>>>>>>>>>>>>>>>> InputFormat. > > >> >> >>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the > ability > > >> to > > >> >> >>> reload > > >> >> >>>>>>>>>>>>>>> cache > > >> >> >>>>>>>>>>>>>>>>> data > > >> >> >>>>>>>>>>>>>>>>>>>> in > > >> >> >>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on > number > > >> of > > >> >> >>>>>>>>>>>>>>> InputSplits, > > >> >> >>>>>>>>>>>>>>>>> but > > >> >> >>>>>>>>>>>>>>>>>>>> has > > >> >> >>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload > > time > > >> >> >>>> significantly > > >> >> >>>>>>>>>>>>>>>>> reduces > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream > blocking). I > > >> know > > >> >> >>> that > > >> >> >>>>>>>>>>>>>>> usually > > >> >> >>>>>>>>>>>>>>>>> we > > >> >> >>>>>>>>>>>>>>>>>>>> try > > >> >> >>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink > code, > > >> but > > >> >> >>> maybe > > >> >> >>>> this > > >> >> >>>>>>>>>>>>>>> one > > >> >> >>>>>>>>>>>>>>>>> can > > >> >> >>>>>>>>>>>>>>>>>>>> be > > >> >> >>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an > > >> ideal > > >> >> >>>> solution, > > >> >> >>>>>>>>>>>>>>> maybe > > >> >> >>>>>>>>>>>>>>>>>>>> there > > >> >> >>>>>>>>>>>>>>>>>>>>>>> are better ones. > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might > > >> >> >> introduce > > >> >> >>>>>>>>>>>>>>>>> compatibility > > >> >> >>>>>>>>>>>>>>>>>>>>>> issues > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the > > developer > > >> of > > >> >> >> the > > >> >> >>>>>>>>>>>>>>> connector > > >> >> >>>>>>>>>>>>>>>>>>>> won't > > >> >> >>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use new > > >> cache > > >> >> >>>> options > > >> >> >>>>>>>>>>>>>>>>>>>> incorrectly > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options > > into > > >> 2 > > >> >> >>>> different > > >> >> >>>>>>>>>>>>>>> code > > >> >> >>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will > > need > > >> to > > >> >> >> do > > >> >> >>>> is to > > >> >> >>>>>>>>>>>>>>>>> redirect > > >> >> >>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's > > >> LookupConfig > > >> >> (+ > > >> >> >>>> maybe > > >> >> >>>>>>>>>>>>>>> add an > > >> >> >>>>>>>>>>>>>>>>>>>> alias > > >> >> >>>>>>>>>>>>>>>>>>>>>>> for options, if there was different > naming), > > >> >> >>> everything > > >> >> >>>>>>>>>>>>>>> will be > > >> >> >>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer > won't > > >> do > > >> >> >>>>>>>>>>>>>>> refactoring at > > >> >> >>>>>>>>>>>>>>>>> all, > > >> >> >>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector > > >> because > > >> >> >> of > > >> >> >>>>>>>>>>>>>>> backward > > >> >> >>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to > > use > > >> >> his > > >> >> >>> own > > >> >> >>>>>>>>>>>>>>> cache > > >> >> >>>>>>>>>>>>>>>>> logic, > > >> >> >>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the > > configs > > >> >> into > > >> >> >>> the > > >> >> >>>>>>>>>>>>>>>>> framework, > > >> >> >>>>>>>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with > > >> already > > >> >> >>>> existing > > >> >> >>>>>>>>>>>>>>>>> configs > > >> >> >>>>>>>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a > > rare > > >> >> >> case). > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed > all > > >> the > > >> >> >> way > > >> >> >>>> down > > >> >> >>>>>>>>>>>>>>> to > > >> >> >>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>> table > > >> >> >>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan > source > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is > that > > >> the > > >> >> >>> ONLY > > >> >> >>>>>>>>>>>>>>> connector > > >> >> >>>>>>>>>>>>>>>>>>>> that > > >> >> >>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is > > >> FileSystemTableSource > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it > > currently). > > >> >> Also > > >> >> >>>> for some > > >> >> >>>>>>>>>>>>>>>>>>>> databases > > >> >> >>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such > > complex > > >> >> >>> filters > > >> >> >>>>>>>>>>>>>>> that we > > >> >> >>>>>>>>>>>>>>>>> have > > >> >> >>>>>>>>>>>>>>>>>>>>>>> in Flink. > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the > > cache > > >> >> >> seems > > >> >> >>>> not > > >> >> >>>>>>>>>>>>>>>>> quite > > >> >> >>>>>>>>>>>>>>>>>>>>> useful > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large > > >> amount of > > >> >> >>> data > > >> >> >>>>>>>>>>>>>>> from the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, > > suppose > > >> in > > >> >> >>>> dimension > > >> >> >>>>>>>>>>>>>>>>> table > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 'users' > > >> >> >>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to > > 40, > > >> >> and > > >> >> >>>> input > > >> >> >>>>>>>>>>>>>>> stream > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by > > age > > >> of > > >> >> >>>> users. If > > >> >> >>>>>>>>>>>>>>> we > > >> >> >>>>>>>>>>>>>>>>> have > > >> >> >>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30', > > >> >> >>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. > This > > >> means > > >> >> >>> the > > >> >> >>>> user > > >> >> >>>>>>>>>>>>>>> can > > >> >> >>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost > 2 > > >> >> times. > > >> >> >>> It > > >> >> >>>> will > > >> >> >>>>>>>>>>>>>>>>> gain a > > >> >> >>>>>>>>>>>>>>>>>>>>>>> huge > > >> >> >>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this > > optimization > > >> >> >> starts > > >> >> >>>> to > > >> >> >>>>>>>>>>>>>>> really > > >> >> >>>>>>>>>>>>>>>>>>>> shine > > >> >> >>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without > filters > > >> and > > >> >> >>>> projections > > >> >> >>>>>>>>>>>>>>>>> can't > > >> >> >>>>>>>>>>>>>>>>>>>> fit > > >> >> >>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens > up > > >> >> >>>> additional > > >> >> >>>>>>>>>>>>>>>>>>>> possibilities > > >> >> >>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not > > quite > > >> >> >>>> useful'. > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices > > >> regarding > > >> >> >> this > > >> >> >>>> topic! > > >> >> >>>>>>>>>>>>>>>>> Because > > >> >> >>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial > points, > > >> and I > > >> >> >>>> think > > >> >> >>>>>>>>>>>>>>> with > > >> >> >>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>> help > > >> >> >>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to come > > to a > > >> >> >>>> consensus. > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren > < > > >> >> >>>>>>>>>>>>>>> renqs...@gmail.com > > >> >> >>>>>>>>>>>>>>>>>> : > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my > > >> late > > >> >> >>>> response! > > >> >> >>>>>>>>>>>>>>> We > > >> >> >>>>>>>>>>>>>>>>> had > > >> >> >>>>>>>>>>>>>>>>>>>> an > > >> >> >>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark and > > >> Leonard > > >> >> >>> and > > >> >> >>>> I’d > > >> >> >>>>>>>>>>>>>>> like > > >> >> >>>>>>>>>>>>>>>>> to > > >> >> >>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of > implementing > > >> the > > >> >> >>> cache > > >> >> >>>>>>>>>>>>>>> logic in > > >> >> >>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>> table > > >> >> >>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the > > >> user-provided > > >> >> >>>> table > > >> >> >>>>>>>>>>>>>>>>> function, > > >> >> >>>>>>>>>>>>>>>>>>>> we > > >> >> >>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending > > >> >> >>>> TableFunction > > >> >> >>>>>>>>>>>>>>> with > > >> >> >>>>>>>>>>>>>>>>> these > > >> >> >>>>>>>>>>>>>>>>>>>>>>> concerns: > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of > > >> "FOR > > >> >> >>>>>>>>>>>>>>> SYSTEM_TIME > > >> >> >>>>>>>>>>>>>>>>> AS OF > > >> >> >>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly > reflect > > >> the > > >> >> >>>> content > > >> >> >>>>>>>>>>>>>>> of the > > >> >> >>>>>>>>>>>>>>>>>>>> lookup > > >> >> >>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users > > >> choose > > >> >> to > > >> >> >>>> enable > > >> >> >>>>>>>>>>>>>>>>> caching > > >> >> >>>>>>>>>>>>>>>>>>>> on > > >> >> >>>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that > > >> this > > >> >> >>>> breakage is > > >> >> >>>>>>>>>>>>>>>>>>>> acceptable > > >> >> >>>>>>>>>>>>>>>>>>>>>> in > > >> >> >>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer > > not > > >> to > > >> >> >>>> provide > > >> >> >>>>>>>>>>>>>>>>> caching on > > >> >> >>>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> table runtime level. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in > > the > > >> >> >>>> framework > > >> >> >>>>>>>>>>>>>>>>> (whether > > >> >> >>>>>>>>>>>>>>>>>>>> in a > > >> >> >>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), > we > > >> have > > >> >> >> to > > >> >> >>>>>>>>>>>>>>> confront a > > >> >> >>>>>>>>>>>>>>>>>>>>>> situation > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to control > > the > > >> >> >>>> behavior of > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>> framework, > > >> >> >>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and > > should > > >> be > > >> >> >>>> cautious. > > >> >> >>>>>>>>>>>>>>>>> Under > > >> >> >>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the > framework > > >> >> should > > >> >> >>>> only be > > >> >> >>>>>>>>>>>>>>>>>>>> specified > > >> >> >>>>>>>>>>>>>>>>>>>>> by > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s > > >> hard > > >> >> to > > >> >> >>>> apply > > >> >> >>>>>>>>>>>>>>> these > > >> >> >>>>>>>>>>>>>>>>>>>> general > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configs to a specific table. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source > > loads > > >> and > > >> >> >>>> refresh > > >> >> >>>>>>>>>>>>>>> all > > >> >> >>>>>>>>>>>>>>>>>>>> records > > >> >> >>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve > high > > >> >> lookup > > >> >> >>>>>>>>>>>>>>> performance > > >> >> >>>>>>>>>>>>>>>>>>>> (like > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hive > > >> >> >>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also widely > > >> used > > >> >> by > > >> >> >>> our > > >> >> >>>>>>>>>>>>>>> internal > > >> >> >>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the > > >> user’s > > >> >> >>>>>>>>>>>>>>> TableFunction > > >> >> >>>>>>>>>>>>>>>>>>>> works > > >> >> >>>>>>>>>>>>>>>>>>>>>> fine > > >> >> >>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to > > >> introduce a > > >> >> >>> new > > >> >> >>>>>>>>>>>>>>>>> interface for > > >> >> >>>>>>>>>>>>>>>>>>>>> this > > >> >> >>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would > > >> become > > >> >> >> more > > >> >> >>>>>>>>>>>>>>> complex. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework > > might > > >> >> >>>> introduce > > >> >> >>>>>>>>>>>>>>>>>>>> compatibility > > >> >> >>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like > there > > >> might > > >> >> >>>> exist two > > >> >> >>>>>>>>>>>>>>>>> caches > > >> >> >>>>>>>>>>>>>>>>>>>>> with > > >> >> >>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user > > >> >> >> incorrectly > > >> >> >>>>>>>>>>>>>>> configures > > >> >> >>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>> table > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another > implemented > > >> by > > >> >> >> the > > >> >> >>>> lookup > > >> >> >>>>>>>>>>>>>>>>> source). > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by > > >> Alexander, I > > >> >> >>>> think > > >> >> >>>>>>>>>>>>>>>>> filters > > >> >> >>>>>>>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way > down > > >> to > > >> >> >> the > > >> >> >>>> table > > >> >> >>>>>>>>>>>>>>>>> function, > > >> >> >>>>>>>>>>>>>>>>>>>>> like > > >> >> >>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of > the > > >> >> >> runner > > >> >> >>>> with > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>> cache. > > >> >> >>>>>>>>>>>>>>>>>>>>> The > > >> >> >>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the > network > > >> I/O > > >> >> >> and > > >> >> >>>>>>>>>>>>>>> pressure > > >> >> >>>>>>>>>>>>>>>>> on the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these > > >> >> >>> optimizations > > >> >> >>>> to > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>> cache > > >> >> >>>>>>>>>>>>>>>>>>>>> seems > > >> >> >>>>>>>>>>>>>>>>>>>>>>> not quite useful. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to > > reflect > > >> our > > >> >> >>>> ideas. > > >> >> >>>>>>>>>>>>>>> We > > >> >> >>>>>>>>>>>>>>>>>>>> prefer to > > >> >> >>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of > > >> >> >>>> TableFunction, > > >> >> >>>>>>>>>>>>>>> and we > > >> >> >>>>>>>>>>>>>>>>>>>> could > > >> >> >>>>>>>>>>>>>>>>>>>>>>> provide some helper classes > > >> (CachingTableFunction, > > >> >> >>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction, > > >> >> >>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers > and > > >> >> >> regulate > > >> >> >>>>>>>>>>>>>>> metrics > > >> >> >>>>>>>>>>>>>>>>> of the > > >> >> >>>>>>>>>>>>>>>>>>>>>> cache. > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas! > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> [1] > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> > > >> >> >>>> > > >> >> >>> > > >> >> >> > > >> >> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> [2] > > >> >> >>> https://github.com/PatrickRen/flink/tree/FLIP-221 > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр > > >> Смирнов > > >> >> >> < > > >> >> >>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid! > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier > > solution > > >> as > > >> >> >> the > > >> >> >>>>>>>>>>>>>>> first > > >> >> >>>>>>>>>>>>>>>>> step: > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually > > >> exclusive > > >> >> >>>>>>>>>>>>>>> (originally > > >> >> >>>>>>>>>>>>>>>>>>>>> proposed > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because > > conceptually > > >> >> they > > >> >> >>>> follow > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>> same > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are > > >> different. > > >> >> >> If > > >> >> >>> we > > >> >> >>>>>>>>>>>>>>> will > > >> >> >>>>>>>>>>>>>>>>> go one > > >> >> >>>>>>>>>>>>>>>>>>>>> way, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future will > > mean > > >> >> >>>> deleting > > >> >> >>>>>>>>>>>>>>>>> existing > > >> >> >>>>>>>>>>>>>>>>>>>> code > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for > > >> connectors. > > >> >> >> So > > >> >> >>> I > > >> >> >>>>>>>>>>>>>>> think we > > >> >> >>>>>>>>>>>>>>>>>>>> should > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community > about > > >> that > > >> >> >> and > > >> >> >>>> then > > >> >> >>>>>>>>>>>>>>> work > > >> >> >>>>>>>>>>>>>>>>>>>>> together > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on > tasks > > >> for > > >> >> >>>> different > > >> >> >>>>>>>>>>>>>>>>> parts > > >> >> >>>>>>>>>>>>>>>>>>>> of > > >> >> >>>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification > / > > >> >> >>>> introducing > > >> >> >>>>>>>>>>>>>>>>> proposed > > >> >> >>>>>>>>>>>>>>>>>>>> set > > >> >> >>>>>>>>>>>>>>>>>>>>> of > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, > Qingsheng? > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the > > requests > > >> >> >> after > > >> >> >>>>>>>>>>>>>>> filter > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields > > of > > >> the > > >> >> >>>> lookup > > >> >> >>>>>>>>>>>>>>>>> table, we > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after > > >> that we > > >> >> >>> can > > >> >> >>>>>>>>>>>>>>> filter > > >> >> >>>>>>>>>>>>>>>>>>>>> responses, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have > filter > > >> >> >>>> pushdown. So > > >> >> >>>>>>>>>>>>>>> if > > >> >> >>>>>>>>>>>>>>>>>>>>> filtering > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be > much > > >> less > > >> >> >>> rows > > >> >> >>>> in > > >> >> >>>>>>>>>>>>>>>>> cache. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your > > architecture > > >> is > > >> >> >> not > > >> >> >>>>>>>>>>>>>>> shared. > > >> >> >>>>>>>>>>>>>>>>> I > > >> >> >>>>>>>>>>>>>>>>>>>> don't > > >> >> >>>>>>>>>>>>>>>>>>>>>>> know the > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such > kinds > > >> of > > >> >> >>>>>>>>>>>>>>> conversations > > >> >> >>>>>>>>>>>>>>>>> :) > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, > > so > > >> I > > >> >> >>> made a > > >> >> >>>>>>>>>>>>>>> Jira > > >> >> >>>>>>>>>>>>>>>>> issue, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in > > more > > >> >> >>> details > > >> >> >>>> - > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >> https://issues.apache.org/jira/browse/FLINK-27411. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback! > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Best, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise > < > > >> >> >>>>>>>>>>>>>>> ar...@apache.org>: > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the > inconsistency > > >> was > > >> >> >> not > > >> >> >>>>>>>>>>>>>>>>> satisfying > > >> >> >>>>>>>>>>>>>>>>>>>> for > > >> >> >>>>>>>>>>>>>>>>>>>>>> me. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but > could > > >> also > > >> >> >>> live > > >> >> >>>>>>>>>>>>>>> with > > >> >> >>>>>>>>>>>>>>>>> an > > >> >> >>>>>>>>>>>>>>>>>>>>> easier > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of > > >> making > > >> >> >>>> caching > > >> >> >>>>>>>>>>>>>>> an > > >> >> >>>>>>>>>>>>>>>>>>>>>>> implementation > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather > devise a > > >> >> >> caching > > >> >> >>>>>>>>>>>>>>> layer > > >> >> >>>>>>>>>>>>>>>>>>>> around X. > > >> >> >>>>>>>>>>>>>>>>>>>>>> So > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction > > >> that > > >> >> >>>>>>>>>>>>>>> delegates to > > >> >> >>>>>>>>>>>>>>>>> X in > > >> >> >>>>>>>>>>>>>>>>>>>>> case > > >> >> >>>>>>>>>>>>>>>>>>>>>>> of > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. > Lifting > > >> it > > >> >> >> into > > >> >> >>>> the > > >> >> >>>>>>>>>>>>>>>>> operator > > >> >> >>>>>>>>>>>>>>>>>>>>>> model > > >> >> >>>>>>>>>>>>>>>>>>>>>>> as > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is > > >> probably > > >> >> >>>>>>>>>>>>>>> unnecessary > > >> >> >>>>>>>>>>>>>>>>> in > > >> >> >>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>> first step > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will > > only > > >> >> >>> receive > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>> requests > > >> >> >>>>>>>>>>>>>>>>>>>>>>> after > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more > > >> >> >>> interesting > > >> >> >>>> to > > >> >> >>>>>>>>>>>>>>> save > > >> >> >>>>>>>>>>>>>>>>>>>>> memory). > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the > changes > > of > > >> >> >> this > > >> >> >>>> FLIP > > >> >> >>>>>>>>>>>>>>>>> would be > > >> >> >>>>>>>>>>>>>>>>>>>>>>> limited to > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public > > interfaces. > > >> >> >>>> Everything > > >> >> >>>>>>>>>>>>>>> else > > >> >> >>>>>>>>>>>>>>>>>>>>> remains > > >> >> >>>>>>>>>>>>>>>>>>>>>> an > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That > > means > > >> we > > >> >> >> can > > >> >> >>>>>>>>>>>>>>> easily > > >> >> >>>>>>>>>>>>>>>>>>>>>> incorporate > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander > > >> pointed > > >> >> >> out > > >> >> >>>>>>>>>>>>>>> later. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your > > architecture > > >> is > > >> >> >> not > > >> >> >>>>>>>>>>>>>>> shared. > > >> >> >>>>>>>>>>>>>>>>> I > > >> >> >>>>>>>>>>>>>>>>>>>> don't > > >> >> >>>>>>>>>>>>>>>>>>>>>>> know the > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM > Александр > > >> >> >> Смирнов > > >> >> >>> < > > >> >> >>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm > > >> not a > > >> >> >>>>>>>>>>>>>>> committer > > >> >> >>>>>>>>>>>>>>>>> yet, > > >> >> >>>>>>>>>>>>>>>>>>>> but > > >> >> >>>>>>>>>>>>>>>>>>>>>> I'd > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this > FLIP > > >> >> really > > >> >> >>>>>>>>>>>>>>>>> interested > > >> >> >>>>>>>>>>>>>>>>>>>> me. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar > > >> feature in > > >> >> >> my > > >> >> >>>>>>>>>>>>>>>>> company’s > > >> >> >>>>>>>>>>>>>>>>>>>>> Flink > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our > > >> thoughts > > >> >> >> on > > >> >> >>>>>>>>>>>>>>> this and > > >> >> >>>>>>>>>>>>>>>>>>>> make > > >> >> >>>>>>>>>>>>>>>>>>>>>> code > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> open source. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative > > than > > >> >> >>>>>>>>>>>>>>> introducing an > > >> >> >>>>>>>>>>>>>>>>>>>>> abstract > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction > > >> >> (CachingTableFunction). > > >> >> >>> As > > >> >> >>>>>>>>>>>>>>> you > > >> >> >>>>>>>>>>>>>>>>> know, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the > > >> flink-table-common > > >> >> >>>>>>>>>>>>>>> module, > > >> >> >>>>>>>>>>>>>>>>> which > > >> >> >>>>>>>>>>>>>>>>>>>>>>> provides > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – > > it’s > > >> >> very > > >> >> >>>>>>>>>>>>>>>>> convenient > > >> >> >>>>>>>>>>>>>>>>>>>> for > > >> >> >>>>>>>>>>>>>>>>>>>>>>> importing > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, > > >> CachingTableFunction > > >> >> >>>> contains > > >> >> >>>>>>>>>>>>>>>>> logic > > >> >> >>>>>>>>>>>>>>>>>>>> for > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution, so this class and > > >> >> >> everything > > >> >> >>>>>>>>>>>>>>>>> connected > > >> >> >>>>>>>>>>>>>>>>>>>> with > > >> >> >>>>>>>>>>>>>>>>>>>>> it > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module, > > >> probably > > >> >> >> in > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to > > depend > > >> on > > >> >> >>>> another > > >> >> >>>>>>>>>>>>>>>>> module, > > >> >> >>>>>>>>>>>>>>>>>>>>>> which > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which > > >> doesn’t > > >> >> >>>> sound > > >> >> >>>>>>>>>>>>>>>>> good. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method > > >> >> ‘getLookupConfig’ > > >> >> >>> to > > >> >> >>>>>>>>>>>>>>>>>>>>>> LookupTableSource > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow > > >> connectors > > >> >> to > > >> >> >>>> only > > >> >> >>>>>>>>>>>>>>> pass > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, > therefore > > >> they > > >> >> >>> won’t > > >> >> >>>>>>>>>>>>>>>>> depend on > > >> >> >>>>>>>>>>>>>>>>>>>>>>> runtime > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs > > planner > > >> >> >> will > > >> >> >>>>>>>>>>>>>>>>> construct a > > >> >> >>>>>>>>>>>>>>>>>>>>>> lookup > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding > runtime > > >> logic > > >> >> >>>>>>>>>>>>>>>>>>>> (ProcessFunctions > > >> >> >>>>>>>>>>>>>>>>>>>>>> in > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). > Architecture > > >> >> looks > > >> >> >>>> like > > >> >> >>>>>>>>>>>>>>> in > > >> >> >>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>> pinned > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is > > actually > > >> >> >> yours > > >> >> >>>>>>>>>>>>>>>>>>>> CacheConfig). > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that > will > > >> be > > >> >> >>>>>>>>>>>>>>> responsible > > >> >> >>>>>>>>>>>>>>>>> for > > >> >> >>>>>>>>>>>>>>>>>>>>> this > > >> >> >>>>>>>>>>>>>>>>>>>>>> – > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his > > >> inheritors. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in > > >> >> >>>>>>>>>>>>>>> flink-table-runtime > > >> >> >>>>>>>>>>>>>>>>> - > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, > AsyncLookupJoinRunner, > > >> >> >>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes > > >> >> >> LookupJoinCachingRunner, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful > > >> advantage > > >> >> >> of > > >> >> >>>>>>>>>>>>>>> such a > > >> >> >>>>>>>>>>>>>>>>>>>>> solution. > > >> >> >>>>>>>>>>>>>>>>>>>>>>> If > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, > > we > > >> can > > >> >> >>>> apply > > >> >> >>>>>>>>>>>>>>> some > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it. > > >> LookupJoinRunnerWithCalc > > >> >> >> was > > >> >> >>>>>>>>>>>>>>> named > > >> >> >>>>>>>>>>>>>>>>> like > > >> >> >>>>>>>>>>>>>>>>>>>>> this > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, > > which > > >> >> >>> actually > > >> >> >>>>>>>>>>>>>>>>> mostly > > >> >> >>>>>>>>>>>>>>>>>>>>>> consists > > >> >> >>>>>>>>>>>>>>>>>>>>>>> of > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with > lookup > > >> table > > >> >> >> B > > >> >> >>>>>>>>>>>>>>>>> condition > > >> >> >>>>>>>>>>>>>>>>>>>>> ‘JOIN … > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ON > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 > WHERE > > >> >> >>> B.salary > > > >> >> >>>>>>>>>>>>>>> 1000’ > > >> >> >>>>>>>>>>>>>>>>>>>>> ‘calc’ > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = > > >> B.age + > > >> >> >> 10 > > >> >> >>>> and > > >> >> >>>>>>>>>>>>>>>>>>>> B.salary > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 1000. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before > storing > > >> >> >> records > > >> >> >>> in > > >> >> >>>>>>>>>>>>>>>>> cache, > > >> >> >>>>>>>>>>>>>>>>>>>> size > > >> >> >>>>>>>>>>>>>>>>>>>>> of > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: > > >> filters = > > >> >> >>>> avoid > > >> >> >>>>>>>>>>>>>>>>> storing > > >> >> >>>>>>>>>>>>>>>>>>>>>> useless > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce > > >> >> records’ > > >> >> >>>>>>>>>>>>>>> size. So > > >> >> >>>>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>>>>>>> initial > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be > > >> >> increased > > >> >> >>> by > > >> >> >>>>>>>>>>>>>>> the > > >> >> >>>>>>>>>>>>>>>>> user. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it? > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren > > wrote: > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a > > >> discussion > > >> >> >>> about > > >> >> >>>>>>>>>>>>>>>>>>>> FLIP-221[1], > > >> >> >>>>>>>>>>>>>>>>>>>>>>> which > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup > table > > >> >> cache > > >> >> >>> and > > >> >> >>>>>>>>>>>>>>> its > > >> >> >>>>>>>>>>>>>>>>>>>> standard > > >> >> >>>>>>>>>>>>>>>>>>>>>>> metrics. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source > > should > > >> >> >>>> implement > > >> >> >>>>>>>>>>>>>>>>> their > > >> >> >>>>>>>>>>>>>>>>>>>> own > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cache to > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a > > >> >> >> standard > > >> >> >>> of > > >> >> >>>>>>>>>>>>>>>>> metrics > > >> >> >>>>>>>>>>>>>>>>>>>> for > > >> >> >>>>>>>>>>>>>>>>>>>>>>> users and > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with > > lookup > > >> >> >>> joins, > > >> >> >>>>>>>>>>>>>>> which > > >> >> >>>>>>>>>>>>>>>>> is a > > >> >> >>>>>>>>>>>>>>>>>>>>>> quite > > >> >> >>>>>>>>>>>>>>>>>>>>>>> common > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL. > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs > > >> including > > >> >> >>>> cache, > > >> >> >>>>>>>>>>>>>>>>>>>> metrics, > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wrapper > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table > > >> >> options. > > >> >> >>>>>>>>>>>>>>> Please > > >> >> >>>>>>>>>>>>>>>>> take a > > >> >> >>>>>>>>>>>>>>>>>>>>> look > > >> >> >>>>>>>>>>>>>>>>>>>>>>> at the > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any > > >> >> >>> suggestions > > >> >> >>>>>>>>>>>>>>> and > > >> >> >>>>>>>>>>>>>>>>>>>> comments > > >> >> >>>>>>>>>>>>>>>>>>>>>>> would be > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated! > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> > > >> >> >>>> > > >> >> >>> > > >> >> >> > > >> >> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> -- > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Best Regards, > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>>>> -- > > >> >> >>>>>>>>>>>>>>>>>>> Best regards, > > >> >> >>>>>>>>>>>>>>>>>>> Roman Boyko > > >> >> >>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > > >> >> >>>>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>>>> > > >> >> >>>>>>>>>>>>> > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> -- > > >> >> >>>>>>>>>>>> Best Regards, > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Qingsheng Ren > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Real-time Computing Team > > >> >> >>>>>>>>>>>> Alibaba Cloud > > >> >> >>>>>>>>>>>> > > >> >> >>>>>>>>>>>> Email: renqs...@gmail.com > > >> >> >>>>>>>>>> > > >> >> >>>> > > >> >> >>>> > > >> >> >>> > > >> >> >> > > >> >> > > >> >> > > >> > > > > > >