Thank Qingsheng for the detailed summary and updates, The changes look good to me in general. I just have one minor improvement comment. Could we add a static util method to the "FullCachingReloadTrigger" interface for quick usage?
#periodicReloadAtFixedRate(Duration) #periodicReloadWithFixedDelay(Duration) I think we can also do this for LookupCache. Because users may not know where is the default implementations and how to use them. Best, Jark On Wed, 1 Jun 2022 at 18:32, Qingsheng Ren <renqs...@gmail.com> wrote: > Hi Jingsong, > > Thanks for your comments! > > > AllCache definition is not flexible, for example, PartialCache can use > any custom storage, while the AllCache can not, AllCache can also be > considered to store memory or disk, also need a flexible strategy. > > We had an offline discussion with Jark and Leonard. Basically we think > exposing the interface of full cache storage to connector developers might > limit our future optimizations. The storage of full caching shouldn’t have > too many variations for different lookup tables so making it pluggable > might not help a lot. Also I think it is not quite easy for connector > developers to implement such an optimized storage. We can keep optimizing > this storage in the future and all full caching lookup tables would benefit > from this. > > > We are more inclined to deprecate the connector `async` option when > discussing FLIP-234. Can we remove this option from this FLIP? > > Thanks for the reminder! This option has been removed in the latest > version. > > Best regards, > > Qingsheng > > > > On Jun 1, 2022, at 15:28, Jingsong Li <jingsongl...@gmail.com> wrote: > > > > Thanks Alexander for your reply. We can discuss the new interface when it > > comes out. > > > > We are more inclined to deprecate the connector `async` option when > > discussing FLIP-234 [1]. We should use hint to let planner decide. > > Although the discussion has not yet produced a conclusion, can we remove > > this option from this FLIP? It doesn't seem to be related to this FLIP, > but > > more to FLIP-234, and we can form a conclusion over there. > > > > [1] https://lists.apache.org/thread/9k1sl2519kh2n3yttwqc00p07xdfns3h > > > > Best, > > Jingsong > > > > On Wed, Jun 1, 2022 at 4:59 AM Jing Ge <j...@ververica.com> wrote: > > > >> Hi Jark, > >> > >> Thanks for clarifying it. It would be fine. as long as we could provide > the > >> no-cache solution. I was just wondering if the client side cache could > >> really help when HBase is used, since the data to look up should be > huge. > >> Depending how much data will be cached on the client side, the data that > >> should be lru in e.g. LruBlockCache will not be lru anymore. In the > worst > >> case scenario, once the cached data at client side is expired, the > request > >> will hit disk which will cause extra latency temporarily, if I am not > >> mistaken. > >> > >> Best regards, > >> Jing > >> > >> On Mon, May 30, 2022 at 9:59 AM Jark Wu <imj...@gmail.com> wrote: > >> > >>> Hi Jing Ge, > >>> > >>> What do you mean about the "impact on the block cache used by HBase"? > >>> In my understanding, the connector cache and HBase cache are totally > two > >>> things. > >>> The connector cache is a local/client cache, and the HBase cache is a > >>> server cache. > >>> > >>>> does it make sense to have a no-cache solution as one of the > >>> default solutions so that customers will have no effort for the > migration > >>> if they want to stick with Hbase cache > >>> > >>> The implementation migration should be transparent to users. Take the > >> HBase > >>> connector as > >>> an example, it already supports lookup cache but is disabled by > default. > >>> After migration, the > >>> connector still disables cache by default (i.e. no-cache solution). No > >>> migration effort for users. > >>> > >>> HBase cache and connector cache are two different things. HBase cache > >> can't > >>> simply replace > >>> connector cache. Because one of the most important usages for connector > >>> cache is reducing > >>> the I/O request/response and improving the throughput, which can > achieve > >>> by just using a server cache. > >>> > >>> Best, > >>> Jark > >>> > >>> > >>> > >>> > >>> On Fri, 27 May 2022 at 22:42, Jing Ge <j...@ververica.com> wrote: > >>> > >>>> Thanks all for the valuable discussion. The new feature looks very > >>>> interesting. > >>>> > >>>> According to the FLIP description: "*Currently we have JDBC, Hive and > >>> HBase > >>>> connector implemented lookup table source. All existing > implementations > >>>> will be migrated to the current design and the migration will be > >>>> transparent to end users*." I was only wondering if we should pay > >>> attention > >>>> to HBase and similar DBs. Since, commonly, the lookup data will be > huge > >>>> while using HBase, partial caching will be used in this case, if I am > >> not > >>>> mistaken, which might have an impact on the block cache used by HBase, > >>> e.g. > >>>> LruBlockCache. > >>>> Another question is that, since HBase provides a sophisticated cache > >>>> solution, does it make sense to have a no-cache solution as one of the > >>>> default solutions so that customers will have no effort for the > >> migration > >>>> if they want to stick with Hbase cache? > >>>> > >>>> Best regards, > >>>> Jing > >>>> > >>>> On Fri, May 27, 2022 at 11:19 AM Jingsong Li <jingsongl...@gmail.com> > >>>> wrote: > >>>> > >>>>> Hi all, > >>>>> > >>>>> I think the problem now is below: > >>>>> 1. AllCache and PartialCache interface on the non-uniform, one needs > >> to > >>>>> provide LookupProvider, the other needs to provide CacheBuilder. > >>>>> 2. AllCache definition is not flexible, for example, PartialCache can > >>> use > >>>>> any custom storage, while the AllCache can not, AllCache can also be > >>>>> considered to store memory or disk, also need a flexible strategy. > >>>>> 3. AllCache can not customize ReloadStrategy, currently only > >>>>> ScheduledReloadStrategy. > >>>>> > >>>>> In order to solve the above problems, the following are my ideas. > >>>>> > >>>>> ## Top level cache interfaces: > >>>>> > >>>>> ``` > >>>>> > >>>>> public interface CacheLookupProvider extends > >>>>> LookupTableSource.LookupRuntimeProvider { > >>>>> > >>>>> CacheBuilder createCacheBuilder(); > >>>>> } > >>>>> > >>>>> > >>>>> public interface CacheBuilder { > >>>>> Cache create(); > >>>>> } > >>>>> > >>>>> > >>>>> public interface Cache { > >>>>> > >>>>> /** > >>>>> * Returns the value associated with key in this cache, or null > >> if > >>>>> there is no cached value for > >>>>> * key. > >>>>> */ > >>>>> @Nullable > >>>>> Collection<RowData> getIfPresent(RowData key); > >>>>> > >>>>> /** Returns the number of key-value mappings in the cache. */ > >>>>> long size(); > >>>>> } > >>>>> > >>>>> ``` > >>>>> > >>>>> ## Partial cache > >>>>> > >>>>> ``` > >>>>> > >>>>> public interface PartialCacheLookupFunction extends > >>> CacheLookupProvider { > >>>>> > >>>>> @Override > >>>>> PartialCacheBuilder createCacheBuilder(); > >>>>> > >>>>> /** Creates an {@link LookupFunction} instance. */ > >>>>> LookupFunction createLookupFunction(); > >>>>> } > >>>>> > >>>>> > >>>>> public interface PartialCacheBuilder extends CacheBuilder { > >>>>> > >>>>> PartialCache create(); > >>>>> } > >>>>> > >>>>> > >>>>> public interface PartialCache extends Cache { > >>>>> > >>>>> /** > >>>>> * Associates the specified value rows with the specified key row > >>>>> in the cache. If the cache > >>>>> * previously contained value associated with the key, the old > >>>>> value is replaced by the > >>>>> * specified value. > >>>>> * > >>>>> * @return the previous value rows associated with key, or null > >> if > >>>>> there was no mapping for key. > >>>>> * @param key - key row with which the specified value is to be > >>>>> associated > >>>>> * @param value – value rows to be associated with the specified > >>> key > >>>>> */ > >>>>> Collection<RowData> put(RowData key, Collection<RowData> value); > >>>>> > >>>>> /** Discards any cached value for the specified key. */ > >>>>> void invalidate(RowData key); > >>>>> } > >>>>> > >>>>> ``` > >>>>> > >>>>> ## All cache > >>>>> ``` > >>>>> > >>>>> public interface AllCacheLookupProvider extends CacheLookupProvider { > >>>>> > >>>>> void registerReloadStrategy(ScheduledExecutorService > >>>>> executorService, Reloader reloader); > >>>>> > >>>>> ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); > >>>>> > >>>>> @Override > >>>>> AllCacheBuilder createCacheBuilder(); > >>>>> } > >>>>> > >>>>> > >>>>> public interface AllCacheBuilder extends CacheBuilder { > >>>>> > >>>>> AllCache create(); > >>>>> } > >>>>> > >>>>> > >>>>> public interface AllCache extends Cache { > >>>>> > >>>>> void putAll(Iterator<Map<RowData, RowData>> allEntries); > >>>>> > >>>>> void clearAll(); > >>>>> } > >>>>> > >>>>> > >>>>> public interface Reloader { > >>>>> > >>>>> void reload(); > >>>>> } > >>>>> > >>>>> ``` > >>>>> > >>>>> Best, > >>>>> Jingsong > >>>>> > >>>>> On Fri, May 27, 2022 at 11:10 AM Jingsong Li <jingsongl...@gmail.com > >>> > >>>>> wrote: > >>>>> > >>>>>> Thanks Qingsheng and all for your discussion. > >>>>>> > >>>>>> Very sorry to jump in so late. > >>>>>> > >>>>>> Maybe I missed something? > >>>>>> My first impression when I saw the cache interface was, why don't > >> we > >>>>>> provide an interface similar to guava cache [1], on top of guava > >>> cache, > >>>>>> caffeine also makes extensions for asynchronous calls.[2] > >>>>>> There is also the bulk load in caffeine too. > >>>>>> > >>>>>> I am also more confused why first from LookupCacheFactory.Builder > >> and > >>>>> then > >>>>>> to Factory to create Cache. > >>>>>> > >>>>>> [1] https://github.com/google/guava > >>>>>> [2] https://github.com/ben-manes/caffeine/wiki/Population > >>>>>> > >>>>>> Best, > >>>>>> Jingsong > >>>>>> > >>>>>> On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com> wrote: > >>>>>> > >>>>>>> After looking at the new introduced ReloadTime and Becket's > >> comment, > >>>>>>> I agree with Becket we should have a pluggable reloading strategy. > >>>>>>> We can provide some common implementations, e.g., periodic > >>> reloading, > >>>>> and > >>>>>>> daily reloading. > >>>>>>> But there definitely be some connector- or business-specific > >>> reloading > >>>>>>> strategies, e.g. > >>>>>>> notify by a zookeeper watcher, reload once a new Hive partition is > >>>>>>> complete. > >>>>>>> > >>>>>>> Best, > >>>>>>> Jark > >>>>>>> > >>>>>>> On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> > >>>> wrote: > >>>>>>> > >>>>>>>> Hi Qingsheng, > >>>>>>>> > >>>>>>>> Thanks for updating the FLIP. A few comments / questions below: > >>>>>>>> > >>>>>>>> 1. Is there a reason that we have both "XXXFactory" and > >>>> "XXXProvider". > >>>>>>>> What is the difference between them? If they are the same, can > >> we > >>>> just > >>>>>>> use > >>>>>>>> XXXFactory everywhere? > >>>>>>>> > >>>>>>>> 2. Regarding the FullCachingLookupProvider, should the reloading > >>>>> policy > >>>>>>>> also be pluggable? Periodical reloading could be sometimes be > >>> tricky > >>>>> in > >>>>>>>> practice. For example, if user uses 24 hours as the cache > >> refresh > >>>>>>> interval > >>>>>>>> and some nightly batch job delayed, the cache update may still > >> see > >>>> the > >>>>>>>> stale data. > >>>>>>>> > >>>>>>>> 3. In DefaultLookupCacheFactory, it looks like InitialCapacity > >>>> should > >>>>> be > >>>>>>>> removed. > >>>>>>>> > >>>>>>>> 4. The purpose of LookupFunctionProvider#cacheMissingKey() > >> seems a > >>>>>>> little > >>>>>>>> confusing to me. If Optional<LookupCacheFactory> > >> getCacheFactory() > >>>>>>> returns > >>>>>>>> a non-empty factory, doesn't that already indicates the > >> framework > >>> to > >>>>>>> cache > >>>>>>>> the missing keys? Also, why is this method returning an > >>>>>>> Optional<Boolean> > >>>>>>>> instead of boolean? > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> > >>>>>>>> Jiangjie (Becket) Qin > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren < > >> renqs...@gmail.com > >>>> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Lincoln and Jark, > >>>>>>>>> > >>>>>>>>> Thanks for the comments! If the community reaches a consensus > >>> that > >>>> we > >>>>>>> use > >>>>>>>>> SQL hint instead of table options to decide whether to use sync > >>> or > >>>>>>> async > >>>>>>>>> mode, it’s indeed not necessary to introduce the “lookup.async” > >>>>> option. > >>>>>>>>> > >>>>>>>>> I think it’s a good idea to let the decision of async made on > >>> query > >>>>>>>>> level, which could make better optimization with more > >> infomation > >>>>>>> gathered > >>>>>>>>> by planner. Is there any FLIP describing the issue in > >>> FLINK-27625? > >>>> I > >>>>>>>>> thought FLIP-234 is only proposing adding SQL hint for retry on > >>>>> missing > >>>>>>>>> instead of the entire async mode to be controlled by hint. > >>>>>>>>> > >>>>>>>>> Best regards, > >>>>>>>>> > >>>>>>>>> Qingsheng > >>>>>>>>> > >>>>>>>>>> On May 25, 2022, at 15:13, Lincoln Lee < > >> lincoln.8...@gmail.com > >>>> > >>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi Jark, > >>>>>>>>>> > >>>>>>>>>> Thanks for your reply! > >>>>>>>>>> > >>>>>>>>>> Currently 'lookup.async' just lies in HBase connector, I have > >>> no > >>>>> idea > >>>>>>>>>> whether or when to remove it (we can discuss it in another > >>> issue > >>>>> for > >>>>>>> the > >>>>>>>>>> HBase connector after FLINK-27625 is done), just not add it > >>> into > >>>> a > >>>>>>>>> common > >>>>>>>>>> option now. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Lincoln Lee > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道: > >>>>>>>>>> > >>>>>>>>>>> Hi Lincoln, > >>>>>>>>>>> > >>>>>>>>>>> I have taken a look at FLIP-234, and I agree with you that > >> the > >>>>>>>>> connectors > >>>>>>>>>>> can > >>>>>>>>>>> provide both async and sync runtime providers simultaneously > >>>>> instead > >>>>>>>>> of one > >>>>>>>>>>> of them. > >>>>>>>>>>> At that point, "lookup.async" looks redundant. If this > >> option > >>> is > >>>>>>>>> planned to > >>>>>>>>>>> be removed > >>>>>>>>>>> in the long term, I think it makes sense not to introduce it > >>> in > >>>>> this > >>>>>>>>> FLIP. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Jark > >>>>>>>>>>> > >>>>>>>>>>> On Tue, 24 May 2022 at 11:08, Lincoln Lee < > >>>> lincoln.8...@gmail.com > >>>>>> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Qingsheng, > >>>>>>>>>>>> > >>>>>>>>>>>> Sorry for jumping into the discussion so late. It's a good > >>> idea > >>>>>>> that > >>>>>>>>> we > >>>>>>>>>>> can > >>>>>>>>>>>> have a common table option. I have a minor comments on > >>>>>>> 'lookup.async' > >>>>>>>>>>> that > >>>>>>>>>>>> not make it a common option: > >>>>>>>>>>>> > >>>>>>>>>>>> The table layer abstracts both sync and async lookup > >>>>> capabilities, > >>>>>>>>>>>> connectors implementers can choose one or both, in the case > >>> of > >>>>>>>>>>> implementing > >>>>>>>>>>>> only one capability(status of the most of existing builtin > >>>>>>> connectors) > >>>>>>>>>>>> 'lookup.async' will not be used. And when a connector has > >>> both > >>>>>>>>>>>> capabilities, I think this choice is more suitable for > >> making > >>>>>>>>> decisions > >>>>>>>>>>> at > >>>>>>>>>>>> the query level, for example, table planner can choose the > >>>>> physical > >>>>>>>>>>>> implementation of async lookup or sync lookup based on its > >>> cost > >>>>>>>>> model, or > >>>>>>>>>>>> users can give query hint based on their own better > >>>>>>> understanding. If > >>>>>>>>>>>> there is another common table option 'lookup.async', it may > >>>>> confuse > >>>>>>>>> the > >>>>>>>>>>>> users in the long run. > >>>>>>>>>>>> > >>>>>>>>>>>> So, I prefer to leave the 'lookup.async' option in private > >>>> place > >>>>>>> (for > >>>>>>>>> the > >>>>>>>>>>>> current hbase connector) and not turn it into a common > >>> option. > >>>>>>>>>>>> > >>>>>>>>>>>> WDYT? > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Lincoln Lee > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Alexander, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the review! We recently updated the FLIP and > >> you > >>>> can > >>>>>>> find > >>>>>>>>>>>> those > >>>>>>>>>>>>> changes from my latest email. Since some terminologies has > >>>>>>> changed so > >>>>>>>>>>>> I’ll > >>>>>>>>>>>>> use the new concept for replying your comments. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. Builder vs ‘of’ > >>>>>>>>>>>>> I’m OK to use builder pattern if we have additional > >> optional > >>>>>>>>> parameters > >>>>>>>>>>>>> for full caching mode (“rescan” previously). The > >>>>>>> schedule-with-delay > >>>>>>>>>>> idea > >>>>>>>>>>>>> looks reasonable to me, but I think we need to redesign > >> the > >>>>>>> builder > >>>>>>>>> API > >>>>>>>>>>>> of > >>>>>>>>>>>>> full caching to make it more descriptive for developers. > >>> Would > >>>>> you > >>>>>>>>> mind > >>>>>>>>>>>>> sharing your ideas about the API? For accessing the FLIP > >>>>> workspace > >>>>>>>>> you > >>>>>>>>>>>> can > >>>>>>>>>>>>> just provide your account ID and ping any PMC member > >>> including > >>>>>>> Jark. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. Common table options > >>>>>>>>>>>>> We have some discussions these days and propose to > >>> introduce 8 > >>>>>>> common > >>>>>>>>>>>>> table options about caching. It has been updated on the > >>> FLIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3. Retries > >>>>>>>>>>>>> I think we are on the same page :-) > >>>>>>>>>>>>> > >>>>>>>>>>>>> For your additional concerns: > >>>>>>>>>>>>> 1) The table option has been updated. > >>>>>>>>>>>>> 2) We got “lookup.cache” back for configuring whether to > >> use > >>>>>>> partial > >>>>>>>>> or > >>>>>>>>>>>>> full caching mode. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Qingsheng > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On May 19, 2022, at 17:25, Александр Смирнов < > >>>>>>> smirale...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Also I have a few additions: > >>>>>>>>>>>>>> 1) maybe rename 'lookup.cache.maximum-size' to > >>>>>>>>>>>>>> 'lookup.cache.max-rows'? I think it will be more clear > >> that > >>>> we > >>>>>>> talk > >>>>>>>>>>>>>> not about bytes, but about the number of rows. Plus it > >> fits > >>>>> more, > >>>>>>>>>>>>>> considering my optimization with filters. > >>>>>>>>>>>>>> 2) How will users enable rescanning? Are we going to > >>> separate > >>>>>>>>> caching > >>>>>>>>>>>>>> and rescanning from the options point of view? Like > >>> initially > >>>>> we > >>>>>>> had > >>>>>>>>>>>>>> one option 'lookup.cache' with values LRU / ALL. I think > >>> now > >>>> we > >>>>>>> can > >>>>>>>>>>>>>> make a boolean option 'lookup.rescan'. RescanInterval can > >>> be > >>>>>>>>>>>>>> 'lookup.rescan.interval', etc. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>> Alexander > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов < > >>>>>>> smirale...@gmail.com > >>>>>>>>>>>> : > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Qingsheng and Jark, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1. Builders vs 'of' > >>>>>>>>>>>>>>> I understand that builders are used when we have > >> multiple > >>>>>>>>>>> parameters. > >>>>>>>>>>>>>>> I suggested them because we could add parameters later. > >> To > >>>>>>> prevent > >>>>>>>>>>>>>>> Builder for ScanRuntimeProvider from looking redundant I > >>> can > >>>>>>>>> suggest > >>>>>>>>>>>>>>> one more config now - "rescanStartTime". > >>>>>>>>>>>>>>> It's a time in UTC (LocalTime class) when the first > >> reload > >>>> of > >>>>>>> cache > >>>>>>>>>>>>>>> starts. This parameter can be thought of as > >> 'initialDelay' > >>>>> (diff > >>>>>>>>>>>>>>> between current time and rescanStartTime) in method > >>>>>>>>>>>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It > >>> can > >>>> be > >>>>>>> very > >>>>>>>>>>>>>>> useful when the dimension table is updated by some other > >>>>>>> scheduled > >>>>>>>>>>> job > >>>>>>>>>>>>>>> at a certain time. Or when the user simply wants a > >> second > >>>> scan > >>>>>>>>>>> (first > >>>>>>>>>>>>>>> cache reload) be delayed. This option can be used even > >>>> without > >>>>>>>>>>>>>>> 'rescanInterval' - in this case 'rescanInterval' will be > >>> one > >>>>>>> day. > >>>>>>>>>>>>>>> If you are fine with this option, I would be very glad > >> if > >>>> you > >>>>>>> would > >>>>>>>>>>>>>>> give me access to edit FLIP page, so I could add it > >> myself > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2. Common table options > >>>>>>>>>>>>>>> I also think that FactoryUtil would be overloaded by all > >>>> cache > >>>>>>>>>>>>>>> options. But maybe unify all suggested options, not only > >>> for > >>>>>>>>> default > >>>>>>>>>>>>>>> cache? I.e. class 'LookupOptions', that unifies default > >>>> cache > >>>>>>>>>>> options, > >>>>>>>>>>>>>>> rescan options, 'async', 'maxRetries'. WDYT? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3. Retries > >>>>>>>>>>>>>>> I'm fine with suggestion close to > >>> RetryUtils#tryTimes(times, > >>>>>>> call) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>> Alexander > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren < > >>>> renqs...@gmail.com > >>>>>> : > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jark and Alexander, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for your comments! I’m also OK to introduce > >> common > >>>>> table > >>>>>>>>>>>>> options. I prefer to introduce a new > >>> DefaultLookupCacheOptions > >>>>>>> class > >>>>>>>>>>> for > >>>>>>>>>>>>> holding these option definitions because putting all > >> options > >>>>> into > >>>>>>>>>>>>> FactoryUtil would make it a bit ”crowded” and not well > >>>>>>> categorized. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> FLIP has been updated according to suggestions above: > >>>>>>>>>>>>>>>> 1. Use static “of” method for constructing > >>>>>>> RescanRuntimeProvider > >>>>>>>>>>>>> considering both arguments are required. > >>>>>>>>>>>>>>>> 2. Introduce new table options matching > >>>>>>> DefaultLookupCacheFactory > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>> Qingsheng > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu < > >>> imj...@gmail.com> > >>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Alex, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 1) retry logic > >>>>>>>>>>>>>>>>> I think we can extract some common retry logic into > >>>>> utilities, > >>>>>>>>>>> e.g. > >>>>>>>>>>>>> RetryUtils#tryTimes(times, call). > >>>>>>>>>>>>>>>>> This seems independent of this FLIP and can be reused > >> by > >>>>>>>>>>> DataStream > >>>>>>>>>>>>> users. > >>>>>>>>>>>>>>>>> Maybe we can open an issue to discuss this and where > >> to > >>>> put > >>>>>>> it. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> 2) cache ConfigOptions > >>>>>>>>>>>>>>>>> I'm fine with defining cache config options in the > >>>>> framework. > >>>>>>>>>>>>>>>>> A candidate place to put is FactoryUtil which also > >>>> includes > >>>>>>>>>>>>> "sink.parallelism", "format" options. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов < > >>>>>>>>>>>> smirale...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Qingsheng, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thank you for considering my comments. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> there might be custom logic before making retry, > >> such > >>> as > >>>>>>>>>>>>> re-establish the connection > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Yes, I understand that. I meant that such logic can > >> be > >>>>>>> placed in > >>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> separate function, that can be implemented by > >>> connectors. > >>>>>>> Just > >>>>>>>>>>>> moving > >>>>>>>>>>>>>>>>>> the retry logic would make connector's LookupFunction > >>>> more > >>>>>>>>>>> concise > >>>>>>>>>>>> + > >>>>>>>>>>>>>>>>>> avoid duplicate code. However, it's a minor change. > >> The > >>>>>>> decision > >>>>>>>>>>> is > >>>>>>>>>>>>> up > >>>>>>>>>>>>>>>>>> to you. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> We decide not to provide common DDL options and let > >>>>>>> developers > >>>>>>>>>>> to > >>>>>>>>>>>>> define their own options as we do now per connector. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> What is the reason for that? One of the main goals of > >>>> this > >>>>>>> FLIP > >>>>>>>>>>> was > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> unify the configs, wasn't it? I understand that > >> current > >>>>> cache > >>>>>>>>>>>> design > >>>>>>>>>>>>>>>>>> doesn't depend on ConfigOptions, like was before. But > >>>> still > >>>>>>> we > >>>>>>>>>>> can > >>>>>>>>>>>>> put > >>>>>>>>>>>>>>>>>> these options into the framework, so connectors can > >>> reuse > >>>>>>> them > >>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> avoid code duplication, and, what is more > >> significant, > >>>>> avoid > >>>>>>>>>>>> possible > >>>>>>>>>>>>>>>>>> different options naming. This moment can be pointed > >>> out > >>>> in > >>>>>>>>>>>>>>>>>> documentation for connector developers. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>> Alexander > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren < > >>>>>>> renqs...@gmail.com>: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Alexander, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for the review and glad to see we are on the > >>> same > >>>>>>> page! > >>>>>>>>> I > >>>>>>>>>>>>> think you forgot to cc the dev mailing list so I’m also > >>>> quoting > >>>>>>> your > >>>>>>>>>>>> reply > >>>>>>>>>>>>> under this email. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> We can add 'maxRetryTimes' option into this class > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> In my opinion the retry logic should be implemented > >> in > >>>>>>> lookup() > >>>>>>>>>>>>> instead of in LookupFunction#eval(). Retrying is only > >>>> meaningful > >>>>>>>>> under > >>>>>>>>>>>> some > >>>>>>>>>>>>> specific retriable failures, and there might be custom > >> logic > >>>>>>> before > >>>>>>>>>>>> making > >>>>>>>>>>>>> retry, such as re-establish the connection > >>>>>>> (JdbcRowDataLookupFunction > >>>>>>>>>>> is > >>>>>>>>>>>> an > >>>>>>>>>>>>> example), so it's more handy to leave it to the connector. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I don't see DDL options, that were in previous > >>> version > >>>> of > >>>>>>>>> FLIP. > >>>>>>>>>>>> Do > >>>>>>>>>>>>> you have any special plans for them? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> We decide not to provide common DDL options and let > >>>>>>> developers > >>>>>>>>>>> to > >>>>>>>>>>>>> define their own options as we do now per connector. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The rest of comments sound great and I’ll update the > >>>> FLIP. > >>>>>>> Hope > >>>>>>>>>>> we > >>>>>>>>>>>>> can finalize our proposal soon! > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Qingsheng > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов < > >>>>>>>>>>>> smirale...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Qingsheng and devs! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I like the overall design of updated FLIP, however > >> I > >>>> have > >>>>>>>>>>> several > >>>>>>>>>>>>>>>>>>>> suggestions and questions. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1) Introducing LookupFunction as a subclass of > >>>>>>> TableFunction > >>>>>>>>>>> is a > >>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this > >>>> class. > >>>>>>>>> 'eval' > >>>>>>>>>>>>> method > >>>>>>>>>>>>>>>>>>>> of new LookupFunction is great for this purpose. > >> The > >>>> same > >>>>>>> is > >>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> 'async' case. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 2) There might be other configs in future, such as > >>>>>>>>>>>>> 'cacheMissingKey' > >>>>>>>>>>>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in > >>>>>>>>>>>>> ScanRuntimeProvider. > >>>>>>>>>>>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider > >>> and > >>>>>>>>>>>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one > >>>>> 'build' > >>>>>>>>>>>> method > >>>>>>>>>>>>>>>>>>>> instead of many 'of' methods in future)? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 3) What are the plans for existing > >>>> TableFunctionProvider > >>>>>>> and > >>>>>>>>>>>>>>>>>>>> AsyncTableFunctionProvider? I think they should be > >>>>>>> deprecated. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 4) Am I right that the current design does not > >> assume > >>>>>>> usage of > >>>>>>>>>>>>>>>>>>>> user-provided LookupCache in re-scanning? In this > >>> case, > >>>>> it > >>>>>>> is > >>>>>>>>>>> not > >>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>> clear why do we need methods such as 'invalidate' > >> or > >>>>>>> 'putAll' > >>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> LookupCache. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 5) I don't see DDL options, that were in previous > >>>> version > >>>>>>> of > >>>>>>>>>>>> FLIP. > >>>>>>>>>>>>> Do > >>>>>>>>>>>>>>>>>>>> you have any special plans for them? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> If you don't mind, I would be glad to be able to > >> make > >>>>> small > >>>>>>>>>>>>>>>>>>>> adjustments to the FLIP document too. I think it's > >>>> worth > >>>>>>>>>>>> mentioning > >>>>>>>>>>>>>>>>>>>> about what exactly optimizations are planning in > >> the > >>>>>>> future. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>> Smirnov Alexander > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren < > >>>>>>> renqs...@gmail.com > >>>>>>>>>>>> : > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi Alexander and devs, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thank you very much for the in-depth discussion! > >> As > >>>> Jark > >>>>>>>>>>>>> mentioned we were inspired by Alexander's idea and made a > >>>>>>> refactor on > >>>>>>>>>>> our > >>>>>>>>>>>>> design. FLIP-221 [1] has been updated to reflect our > >> design > >>>> now > >>>>>>> and > >>>>>>>>> we > >>>>>>>>>>>> are > >>>>>>>>>>>>> happy to hear more suggestions from you! > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Compared to the previous design: > >>>>>>>>>>>>>>>>>>>>> 1. The lookup cache serves at table runtime level > >>> and > >>>> is > >>>>>>>>>>>>> integrated as a component of LookupJoinRunner as discussed > >>>>>>>>> previously. > >>>>>>>>>>>>>>>>>>>>> 2. Interfaces are renamed and re-designed to > >> reflect > >>>> the > >>>>>>> new > >>>>>>>>>>>>> design. > >>>>>>>>>>>>>>>>>>>>> 3. We separate the all-caching case individually > >> and > >>>>>>>>>>> introduce a > >>>>>>>>>>>>> new RescanRuntimeProvider to reuse the ability of > >> scanning. > >>> We > >>>>> are > >>>>>>>>>>>> planning > >>>>>>>>>>>>> to support SourceFunction / InputFormat for now > >> considering > >>>> the > >>>>>>>>>>>> complexity > >>>>>>>>>>>>> of FLIP-27 Source API. > >>>>>>>>>>>>>>>>>>>>> 4. A new interface LookupFunction is introduced to > >>>> make > >>>>>>> the > >>>>>>>>>>>>> semantic of lookup more straightforward for developers. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> For replying to Alexander: > >>>>>>>>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat > >>> is > >>>>>>>>>>> deprecated > >>>>>>>>>>>>> or not. Am I right that it will be so in the future, but > >>>>> currently > >>>>>>>>> it's > >>>>>>>>>>>> not? > >>>>>>>>>>>>>>>>>>>>> Yes you are right. InputFormat is not deprecated > >> for > >>>>> now. > >>>>>>> I > >>>>>>>>>>>> think > >>>>>>>>>>>>> it will be deprecated in the future but we don't have a > >>> clear > >>>>> plan > >>>>>>>>> for > >>>>>>>>>>>> that. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks again for the discussion on this FLIP and > >>>> looking > >>>>>>>>>>> forward > >>>>>>>>>>>>> to cooperating with you after we finalize the design and > >>>>>>> interfaces! > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Qingsheng > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр > >> Смирнов < > >>>>>>>>>>>>> smirale...@gmail.com> wrote: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard! > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Glad to see that we came to a consensus on almost > >>> all > >>>>>>>>> points! > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> However I'm a little confused whether InputFormat > >>> is > >>>>>>>>>>> deprecated > >>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>> not. Am I right that it will be so in the future, > >>> but > >>>>>>>>>>> currently > >>>>>>>>>>>>> it's > >>>>>>>>>>>>>>>>>>>>>> not? Actually I also think that for the first > >>> version > >>>>>>> it's > >>>>>>>>> OK > >>>>>>>>>>>> to > >>>>>>>>>>>>> use > >>>>>>>>>>>>>>>>>>>>>> InputFormat in ALL cache realization, because > >>>>> supporting > >>>>>>>>>>> rescan > >>>>>>>>>>>>>>>>>>>>>> ability seems like a very distant prospect. But > >> for > >>>>> this > >>>>>>>>>>>>> decision we > >>>>>>>>>>>>>>>>>>>>>> need a consensus among all discussion > >> participants. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> In general, I don't have something to argue with > >>> your > >>>>>>>>>>>>> statements. All > >>>>>>>>>>>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it > >>> would > >>>> be > >>>>>>> nice > >>>>>>>>>>> to > >>>>>>>>>>>>> work > >>>>>>>>>>>>>>>>>>>>>> on this FLIP cooperatively. I've already done a > >> lot > >>>> of > >>>>>>> work > >>>>>>>>>>> on > >>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>> join caching with realization very close to the > >> one > >>>> we > >>>>>>> are > >>>>>>>>>>>>> discussing, > >>>>>>>>>>>>>>>>>>>>>> and want to share the results of this work. > >> Anyway > >>>>>>> looking > >>>>>>>>>>>>> forward for > >>>>>>>>>>>>>>>>>>>>>> the FLIP update! > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu < > >>>> imj...@gmail.com > >>>>>> : > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi Alex, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Thanks for summarizing your points. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have > >>>>>>> discussed > >>>>>>>>>>> it > >>>>>>>>>>>>> several times > >>>>>>>>>>>>>>>>>>>>>>> and we have totally refactored the design. > >>>>>>>>>>>>>>>>>>>>>>> I'm glad to say we have reached a consensus on > >>> many > >>>> of > >>>>>>> your > >>>>>>>>>>>>> points! > >>>>>>>>>>>>>>>>>>>>>>> Qingsheng is still working on updating the > >> design > >>>> docs > >>>>>>> and > >>>>>>>>>>>>> maybe can be > >>>>>>>>>>>>>>>>>>>>>>> available in the next few days. > >>>>>>>>>>>>>>>>>>>>>>> I will share some conclusions from our > >>> discussions: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 1) we have refactored the design towards to > >> "cache > >>>> in > >>>>>>>>>>>>> framework" way. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 2) a "LookupCache" interface for users to > >>> customize > >>>>> and > >>>>>>> a > >>>>>>>>>>>>> default > >>>>>>>>>>>>>>>>>>>>>>> implementation with builder for users to > >> easy-use. > >>>>>>>>>>>>>>>>>>>>>>> This can both make it possible to both have > >>>>> flexibility > >>>>>>> and > >>>>>>>>>>>>> conciseness. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU > >>>> lookup > >>>>>>>>>>> cache, > >>>>>>>>>>>>> esp reducing > >>>>>>>>>>>>>>>>>>>>>>> IO. > >>>>>>>>>>>>>>>>>>>>>>> Filter pushdown should be the final state and > >> the > >>>>>>> unified > >>>>>>>>>>> way > >>>>>>>>>>>>> to both > >>>>>>>>>>>>>>>>>>>>>>> support pruning ALL cache and LRU cache, > >>>>>>>>>>>>>>>>>>>>>>> so I think we should make effort in this > >>> direction. > >>>> If > >>>>>>> we > >>>>>>>>>>> need > >>>>>>>>>>>>> to support > >>>>>>>>>>>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not > >> use > >>>>>>>>>>>>>>>>>>>>>>> it for LRU cache as well? Either way, as we > >> decide > >>>> to > >>>>>>>>>>>> implement > >>>>>>>>>>>>> the cache > >>>>>>>>>>>>>>>>>>>>>>> in the framework, we have the chance to support > >>>>>>>>>>>>>>>>>>>>>>> filter on cache anytime. This is an optimization > >>> and > >>>>> it > >>>>>>>>>>>> doesn't > >>>>>>>>>>>>> affect the > >>>>>>>>>>>>>>>>>>>>>>> public API. I think we can create a JIRA issue > >> to > >>>>>>>>>>>>>>>>>>>>>>> discuss it when the FLIP is accepted. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to > >>> your > >>>>>>>>>>> proposal. > >>>>>>>>>>>>>>>>>>>>>>> In the first version, we will only support > >>>>> InputFormat, > >>>>>>>>>>>>> SourceFunction for > >>>>>>>>>>>>>>>>>>>>>>> cache all (invoke InputFormat in join operator). > >>>>>>>>>>>>>>>>>>>>>>> For FLIP-27 source, we need to join a true > >> source > >>>>>>> operator > >>>>>>>>>>>>> instead of > >>>>>>>>>>>>>>>>>>>>>>> calling it embedded in the join operator. > >>>>>>>>>>>>>>>>>>>>>>> However, this needs another FLIP to support the > >>>>> re-scan > >>>>>>>>>>>> ability > >>>>>>>>>>>>> for FLIP-27 > >>>>>>>>>>>>>>>>>>>>>>> Source, and this can be a large work. > >>>>>>>>>>>>>>>>>>>>>>> In order to not block this issue, we can put the > >>>>> effort > >>>>>>> of > >>>>>>>>>>>>> FLIP-27 source > >>>>>>>>>>>>>>>>>>>>>>> integration into future work and integrate > >>>>>>>>>>>>>>>>>>>>>>> InputFormat&SourceFunction for now. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I think it's fine to use > >>> InputFormat&SourceFunction, > >>>>> as > >>>>>>>>> they > >>>>>>>>>>>>> are not > >>>>>>>>>>>>>>>>>>>>>>> deprecated, otherwise, we have to introduce > >>> another > >>>>>>>>> function > >>>>>>>>>>>>>>>>>>>>>>> similar to them which is meaningless. We need to > >>>> plan > >>>>>>>>>>> FLIP-27 > >>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>>>>>> integration ASAP before InputFormat & > >>> SourceFunction > >>>>> are > >>>>>>>>>>>>> deprecated. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов > >> < > >>>>>>>>>>>>> smirale...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Martijn! > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Got it. Therefore, the realization with > >>> InputFormat > >>>>> is > >>>>>>> not > >>>>>>>>>>>>> considered. > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for clearing that up! > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser < > >>>>>>>>>>>>> mart...@ververica.com>: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> With regards to: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> But if there are plans to refactor all > >>> connectors > >>>>> to > >>>>>>>>>>>> FLIP-27 > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. > >>> The > >>>>> old > >>>>>>>>>>>>> interfaces will be > >>>>>>>>>>>>>>>>>>>>>>>>> deprecated and connectors will either be > >>>> refactored > >>>>> to > >>>>>>>>> use > >>>>>>>>>>>>> the new ones > >>>>>>>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>>>> dropped. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> The caching should work for connectors that > >> are > >>>>> using > >>>>>>>>>>>> FLIP-27 > >>>>>>>>>>>>> interfaces, > >>>>>>>>>>>>>>>>>>>>>>>>> we should not introduce new features for old > >>>>>>> interfaces. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Martijn > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр > >> Смирнов > >>> < > >>>>>>>>>>>>> smirale...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark! > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Sorry for the late response. I would like to > >>> make > >>>>>>> some > >>>>>>>>>>>>> comments and > >>>>>>>>>>>>>>>>>>>>>>>>>> clarify my points. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think > >>> we > >>>>> can > >>>>>>>>>>>> achieve > >>>>>>>>>>>>> both > >>>>>>>>>>>>>>>>>>>>>>>>>> advantages this way: put the Cache interface > >> in > >>>>>>>>>>>>> flink-table-common, > >>>>>>>>>>>>>>>>>>>>>>>>>> but have implementations of it in > >>>>>>> flink-table-runtime. > >>>>>>>>>>>>> Therefore if a > >>>>>>>>>>>>>>>>>>>>>>>>>> connector developer wants to use existing > >> cache > >>>>>>>>>>> strategies > >>>>>>>>>>>>> and their > >>>>>>>>>>>>>>>>>>>>>>>>>> implementations, he can just pass > >> lookupConfig > >>> to > >>>>> the > >>>>>>>>>>>>> planner, but if > >>>>>>>>>>>>>>>>>>>>>>>>>> he wants to have its own cache implementation > >>> in > >>>>> his > >>>>>>>>>>>>> TableFunction, it > >>>>>>>>>>>>>>>>>>>>>>>>>> will be possible for him to use the existing > >>>>>>> interface > >>>>>>>>>>> for > >>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in > >>> the > >>>>>>>>>>>>> documentation). In > >>>>>>>>>>>>>>>>>>>>>>>>>> this way all configs and metrics will be > >>> unified. > >>>>>>> WDYT? > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the > >>> cache, > >>>> we > >>>>>>> will > >>>>>>>>>>>>> have 90% of > >>>>>>>>>>>>>>>>>>>>>>>>>> lookup requests that can never be cached > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters > >>> optimization > >>>> in > >>>>>>> case > >>>>>>>>>>> of > >>>>>>>>>>>>> LRU cache. > >>>>>>>>>>>>>>>>>>>>>>>>>> It looks like Cache<RowData, > >>>> Collection<RowData>>. > >>>>>>> Here > >>>>>>>>>>> we > >>>>>>>>>>>>> always > >>>>>>>>>>>>>>>>>>>>>>>>>> store the response of the dimension table in > >>>> cache, > >>>>>>> even > >>>>>>>>>>>>> after > >>>>>>>>>>>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no > >>> rows > >>>>>>> after > >>>>>>>>>>>>> applying > >>>>>>>>>>>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of > >>>>>>>>>>>> TableFunction, > >>>>>>>>>>>>> we store > >>>>>>>>>>>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the > >>>> cache > >>>>>>> line > >>>>>>>>>>>> will > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>> filled, but will require much less memory (in > >>>>> bytes). > >>>>>>>>>>> I.e. > >>>>>>>>>>>>> we don't > >>>>>>>>>>>>>>>>>>>>>>>>>> completely filter keys, by which result was > >>>> pruned, > >>>>>>> but > >>>>>>>>>>>>> significantly > >>>>>>>>>>>>>>>>>>>>>>>>>> reduce required memory to store this result. > >> If > >>>> the > >>>>>>> user > >>>>>>>>>>>>> knows about > >>>>>>>>>>>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' > >>>>> option > >>>>>>>>>>> before > >>>>>>>>>>>>> the start > >>>>>>>>>>>>>>>>>>>>>>>>>> of the job. But actually I came up with the > >>> idea > >>>>>>> that we > >>>>>>>>>>>> can > >>>>>>>>>>>>> do this > >>>>>>>>>>>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' > >> and > >>>>>>> 'weigher' > >>>>>>>>>>>>> methods of > >>>>>>>>>>>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the > >>>>>>> collection > >>>>>>>>>>> of > >>>>>>>>>>>>> rows > >>>>>>>>>>>>>>>>>>>>>>>>>> (value of cache). Therefore cache can > >>>> automatically > >>>>>>> fit > >>>>>>>>>>>> much > >>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>>>> records than before. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do > >>>>> filters > >>>>>>> and > >>>>>>>>>>>>> projects > >>>>>>>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > >>>>>>>>>>>>> SupportsProjectionPushDown. > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the > >>>>> interfaces, > >>>>>>>>>>> don't > >>>>>>>>>>>>> mean it's > >>>>>>>>>>>>>>>>>>>>>>>> hard > >>>>>>>>>>>>>>>>>>>>>>>>>> to implement. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> It's debatable how difficult it will be to > >>>>> implement > >>>>>>>>>>> filter > >>>>>>>>>>>>> pushdown. > >>>>>>>>>>>>>>>>>>>>>>>>>> But I think the fact that currently there is > >> no > >>>>>>> database > >>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>>>>>>>>> with filter pushdown at least means that this > >>>>> feature > >>>>>>>>>>> won't > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we > >>>> talk > >>>>>>> about > >>>>>>>>>>>>> other > >>>>>>>>>>>>>>>>>>>>>>>>>> connectors (not in Flink repo), their > >> databases > >>>>> might > >>>>>>>>> not > >>>>>>>>>>>>> support all > >>>>>>>>>>>>>>>>>>>>>>>>>> Flink filters (or not support filters at > >> all). > >>> I > >>>>>>> think > >>>>>>>>>>>> users > >>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>>>> interested in supporting cache filters > >>>> optimization > >>>>>>>>>>>>> independently of > >>>>>>>>>>>>>>>>>>>>>>>>>> supporting other features and solving more > >>>> complex > >>>>>>>>>>> problems > >>>>>>>>>>>>> (or > >>>>>>>>>>>>>>>>>>>>>>>>>> unsolvable at all). > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 3) I agree with your third statement. > >> Actually > >>> in > >>>>> our > >>>>>>>>>>>>> internal version > >>>>>>>>>>>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning > >> and > >>>>>>>>> reloading > >>>>>>>>>>>>> data from > >>>>>>>>>>>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find > >> a > >>>> way > >>>>> to > >>>>>>>>>>> unify > >>>>>>>>>>>>> the logic > >>>>>>>>>>>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, > >>>>>>>>> SourceFunction, > >>>>>>>>>>>>> Source,...) > >>>>>>>>>>>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a > >>> result > >>>> I > >>>>>>>>>>> settled > >>>>>>>>>>>>> on using > >>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning > >>> in > >>>>> all > >>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>>>> connectors. (I didn't know that there are > >> plans > >>>> to > >>>>>>>>>>>> deprecate > >>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO > >>>> usage > >>>>> of > >>>>>>>>>>>>> FLIP-27 source > >>>>>>>>>>>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this > >>>>> source > >>>>>>> was > >>>>>>>>>>>>> designed to > >>>>>>>>>>>>>>>>>>>>>>>>>> work in distributed environment > >>> (SplitEnumerator > >>>> on > >>>>>>>>>>>>> JobManager and > >>>>>>>>>>>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one > >>>> operator > >>>>>>>>>>> (lookup > >>>>>>>>>>>>> join > >>>>>>>>>>>>>>>>>>>>>>>>>> operator in our case). There is even no > >> direct > >>>> way > >>>>> to > >>>>>>>>>>> pass > >>>>>>>>>>>>> splits from > >>>>>>>>>>>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic > >>> works > >>>>>>>>> through > >>>>>>>>>>>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires > >>>>>>>>>>>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send > >>>>>>>>>>> AddSplitEvents). > >>>>>>>>>>>>> Usage of > >>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more > >>> clearer > >>>>> and > >>>>>>>>>>>>> easier. But if > >>>>>>>>>>>>>>>>>>>>>>>>>> there are plans to refactor all connectors to > >>>>>>> FLIP-27, I > >>>>>>>>>>>>> have the > >>>>>>>>>>>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from > >>> lookup > >>>>> join > >>>>>>>>> ALL > >>>>>>>>>>>>> cache in > >>>>>>>>>>>>>>>>>>>>>>>>>> favor of simple join with multiple scanning > >> of > >>>>> batch > >>>>>>>>>>>> source? > >>>>>>>>>>>>> The point > >>>>>>>>>>>>>>>>>>>>>>>>>> is that the only difference between lookup > >> join > >>>> ALL > >>>>>>>>> cache > >>>>>>>>>>>>> and simple > >>>>>>>>>>>>>>>>>>>>>>>>>> join with batch source is that in the first > >>> case > >>>>>>>>> scanning > >>>>>>>>>>>> is > >>>>>>>>>>>>> performed > >>>>>>>>>>>>>>>>>>>>>>>>>> multiple times, in between which state > >> (cache) > >>> is > >>>>>>>>> cleared > >>>>>>>>>>>>> (correct me > >>>>>>>>>>>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the > >>>>>>> functionality of > >>>>>>>>>>>>> simple join > >>>>>>>>>>>>>>>>>>>>>>>>>> to support state reloading + extend the > >>>>>>> functionality of > >>>>>>>>>>>>> scanning > >>>>>>>>>>>>>>>>>>>>>>>>>> batch source multiple times (this one should > >> be > >>>>> easy > >>>>>>>>> with > >>>>>>>>>>>>> new FLIP-27 > >>>>>>>>>>>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading > >> - > >>> we > >>>>>>> will > >>>>>>>>>>> need > >>>>>>>>>>>>> to change > >>>>>>>>>>>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits > >>>> again > >>>>>>> after > >>>>>>>>>>>>> some TTL). > >>>>>>>>>>>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a > >>> long-term > >>>>>>> goal > >>>>>>>>>>> and > >>>>>>>>>>>>> will make > >>>>>>>>>>>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you > >>> said. > >>>>>>> Maybe > >>>>>>>>>>> we > >>>>>>>>>>>>> can limit > >>>>>>>>>>>>>>>>>>>>>>>>>> ourselves to a simpler solution now > >>>> (InputFormats). > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> So to sum up, my points is like this: > >>>>>>>>>>>>>>>>>>>>>>>>>> 1) There is a way to make both concise and > >>>> flexible > >>>>>>>>>>>>> interfaces for > >>>>>>>>>>>>>>>>>>>>>>>>>> caching in lookup join. > >>>>>>>>>>>>>>>>>>>>>>>>>> 2) Cache filters optimization is important > >> both > >>>> in > >>>>>>> LRU > >>>>>>>>>>> and > >>>>>>>>>>>>> ALL caches. > >>>>>>>>>>>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be > >>>>>>> supported > >>>>>>>>>>> in > >>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>>>>>>>>>>>> connectors, some of the connectors might not > >>> have > >>>>> the > >>>>>>>>>>>>> opportunity to > >>>>>>>>>>>>>>>>>>>>>>>>>> support filter pushdown + as I know, > >> currently > >>>>> filter > >>>>>>>>>>>>> pushdown works > >>>>>>>>>>>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache > >>> filters > >>>> + > >>>>>>>>>>>>> projections > >>>>>>>>>>>>>>>>>>>>>>>>>> optimization should be independent from other > >>>>>>> features. > >>>>>>>>>>>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic > >>> that > >>>>>>>>> involves > >>>>>>>>>>>>> multiple > >>>>>>>>>>>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing > >>> from > >>>>>>>>>>>>> InputFormat in favor > >>>>>>>>>>>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache > >>> realization > >>>>>>> really > >>>>>>>>>>>>> complex and > >>>>>>>>>>>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can > >>> extend > >>>>> the > >>>>>>>>>>>>> functionality of > >>>>>>>>>>>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in > >>>> case > >>>>> of > >>>>>>>>>>>> lookup > >>>>>>>>>>>>> join ALL > >>>>>>>>>>>>>>>>>>>>>>>>>> cache? > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu < > >>>>> imj...@gmail.com > >>>>>>>> : > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> It's great to see the active discussion! I > >>> want > >>>> to > >>>>>>>>> share > >>>>>>>>>>>> my > >>>>>>>>>>>>> ideas: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. > >>>> connectors > >>>>>>> base > >>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both > >>> ways > >>>>>>> should > >>>>>>>>>>>>> work (e.g., > >>>>>>>>>>>>>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>>>>> pruning, compatibility). > >>>>>>>>>>>>>>>>>>>>>>>>>>> The framework way can provide more concise > >>>>>>> interfaces. > >>>>>>>>>>>>>>>>>>>>>>>>>>> The connector base way can define more > >>> flexible > >>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>>>>> strategies/implementations. > >>>>>>>>>>>>>>>>>>>>>>>>>>> We are still investigating a way to see if > >> we > >>>> can > >>>>>>> have > >>>>>>>>>>>> both > >>>>>>>>>>>>>>>>>>>>>>>> advantages. > >>>>>>>>>>>>>>>>>>>>>>>>>>> We should reach a consensus that the way > >>> should > >>>>> be a > >>>>>>>>>>> final > >>>>>>>>>>>>> state, > >>>>>>>>>>>>>>>>>>>>>>>> and we > >>>>>>>>>>>>>>>>>>>>>>>>>>> are on the path to it. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 2) filters and projections pushdown: > >>>>>>>>>>>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown > >>> into > >>>>>>> cache > >>>>>>>>>>> can > >>>>>>>>>>>>> benefit a > >>>>>>>>>>>>>>>>>>>>>>>> lot > >>>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>> ALL cache. > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. > >>>>> Connectors > >>>>>>> use > >>>>>>>>>>>>> cache to > >>>>>>>>>>>>>>>>>>>>>>>> reduce > >>>>>>>>>>>>>>>>>>>>>>>>>> IO > >>>>>>>>>>>>>>>>>>>>>>>>>>> requests to databases for better throughput. > >>>>>>>>>>>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the > >>> cache, > >>>> we > >>>>>>> will > >>>>>>>>>>>>> have 90% of > >>>>>>>>>>>>>>>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>>>>> requests that can never be cached > >>>>>>>>>>>>>>>>>>>>>>>>>>> and hit directly to the databases. That > >> means > >>>> the > >>>>>>> cache > >>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> meaningless in > >>>>>>>>>>>>>>>>>>>>>>>>>>> this case. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way > >> to > >>> do > >>>>>>>>> filters > >>>>>>>>>>>>> and projects > >>>>>>>>>>>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > >>>>>>>>>>>>>>>>>>>>>>>> SupportsProjectionPushDown. > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the > >>>>> interfaces, > >>>>>>>>>>> don't > >>>>>>>>>>>>> mean it's > >>>>>>>>>>>>>>>>>>>>>>>> hard > >>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>> implement. > >>>>>>>>>>>>>>>>>>>>>>>>>>> They should implement the pushdown > >> interfaces > >>> to > >>>>>>> reduce > >>>>>>>>>>> IO > >>>>>>>>>>>>> and the > >>>>>>>>>>>>>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>>>>> size. > >>>>>>>>>>>>>>>>>>>>>>>>>>> That should be a final state that the scan > >>>> source > >>>>>>> and > >>>>>>>>>>>>> lookup source > >>>>>>>>>>>>>>>>>>>>>>>> share > >>>>>>>>>>>>>>>>>>>>>>>>>>> the exact pushdown implementation. > >>>>>>>>>>>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the > >>>> pushdown > >>>>>>> logic > >>>>>>>>>>> in > >>>>>>>>>>>>> caches, > >>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>>>> will complex the lookup join design. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> 3) ALL cache abstraction > >>>>>>>>>>>>>>>>>>>>>>>>>>> All cache might be the most challenging part > >>> of > >>>>> this > >>>>>>>>>>> FLIP. > >>>>>>>>>>>>> We have > >>>>>>>>>>>>>>>>>>>>>>>> never > >>>>>>>>>>>>>>>>>>>>>>>>>>> provided a reload-lookup public interface. > >>>>>>>>>>>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the > >>> "eval" > >>>>>>> method > >>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> TableFunction. > >>>>>>>>>>>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive). > >>>>>>>>>>>>>>>>>>>>>>>>>>> Ideally, connector implementation should > >> share > >>>> the > >>>>>>>>> logic > >>>>>>>>>>>> of > >>>>>>>>>>>>> reload > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with > >>>>>>>>>>>>> InputFormat/SourceFunction/FLIP-27 > >>>>>>>>>>>>>>>>>>>>>>>>>> Source. > >>>>>>>>>>>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are > >>>>> deprecated, > >>>>>>> and > >>>>>>>>>>>> the > >>>>>>>>>>>>> FLIP-27 > >>>>>>>>>>>>>>>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator. > >>>>>>>>>>>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in > >>>>>>> LookupJoin, > >>>>>>>>>>>> this > >>>>>>>>>>>>> may make > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>> scope of this FLIP much larger. > >>>>>>>>>>>>>>>>>>>>>>>>>>> We are still investigating how to abstract > >> the > >>>> ALL > >>>>>>>>> cache > >>>>>>>>>>>>> logic and > >>>>>>>>>>>>>>>>>>>>>>>> reuse > >>>>>>>>>>>>>>>>>>>>>>>>>>> the existing source interfaces. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko < > >>>>>>>>>>>>> ro.v.bo...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> It's a much more complicated activity and > >>> lies > >>>>> out > >>>>>>> of > >>>>>>>>>>> the > >>>>>>>>>>>>> scope of > >>>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should > >> be > >>>>> done > >>>>>>> for > >>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>>>>>> ScanTableSource > >>>>>>>>>>>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones). > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn > >> Visser < > >>>>>>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander > >>>> correctly > >>>>>>>>>>>> mentioned > >>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> filter > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for > >>>>>>>>>>> jdbc/hive/hbase." > >>>>>>>>>>>>> -> Would > >>>>>>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternative solution be to actually > >>> implement > >>>>>>> these > >>>>>>>>>>>> filter > >>>>>>>>>>>>>>>>>>>>>>>> pushdowns? > >>>>>>>>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits > >> to > >>>>> doing > >>>>>>>>>>> that, > >>>>>>>>>>>>> outside > >>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> caching and metrics. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Martijn Visser > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82 > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < > >>>>>>>>>>>>> ro.v.bo...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable > >>>> improvement! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do think that single cache > >> implementation > >>>>>>> would be > >>>>>>>>>>> a > >>>>>>>>>>>>> nice > >>>>>>>>>>>>>>>>>>>>>>>>>> opportunity > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR > >>> SYSTEM_TIME > >>>>> AS > >>>>>>> OF > >>>>>>>>>>>>> proc_time" > >>>>>>>>>>>>>>>>>>>>>>>>>> semantics > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be > >>>>>>> implemented. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can > >>> say > >>>>>>> that: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity > >>> to > >>>>> cut > >>>>>>> off > >>>>>>>>>>>> the > >>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>> size > >>>>>>>>>>>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And > >> the > >>>> most > >>>>>>>>> handy > >>>>>>>>>>>>> way to do > >>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> apply > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a > >> bit > >>>>>>> harder to > >>>>>>>>>>>>> pass it > >>>>>>>>>>>>>>>>>>>>>>>>>> through the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And > >>>> Alexander > >>>>>>>>>>>> correctly > >>>>>>>>>>>>>>>>>>>>>>>> mentioned > >>>>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented > >>> for > >>>>>>>>>>>>> jdbc/hive/hbase. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different > >> caching > >>>>>>>>>>> parameters > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> tables > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to > >>> set > >>>> it > >>>>>>>>>>> through > >>>>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>>>>> rather > >>>>>>>>>>>>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other > >>> options > >>>>> for > >>>>>>>>> all > >>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>> tables. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework > >>>>> really > >>>>>>>>>>>>> deprives us of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to > >>>> implement > >>>>>>>>> their > >>>>>>>>>>>> own > >>>>>>>>>>>>>>>>>>>>>>>> cache). > >>>>>>>>>>>>>>>>>>>>>>>>>> But > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> most > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating > >>> more > >>>>>>>>>>> different > >>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>>>> strategies > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a wider set of configurations. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> All these points are much closer to the > >>>> schema > >>>>>>>>>>> proposed > >>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>>>>> Alexander. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm > >> not > >>>>> right > >>>>>>> and > >>>>>>>>>>>> all > >>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> facilities > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might be simply implemented in your > >>>>> architecture? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn > >>> Visser < > >>>>>>>>>>>>>>>>>>>>>>>>>> martijnvis...@apache.org> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just > >>>> wanted > >>>>> to > >>>>>>>>>>>>> express that > >>>>>>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>>>> really > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on > >> this > >>>>> topic > >>>>>>>>>>> and I > >>>>>>>>>>>>> hope > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> others > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will join the conversation. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Martijn > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр > >>>>> Смирнов < > >>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! > >>>> However, I > >>>>>>> have > >>>>>>>>>>>>> questions > >>>>>>>>>>>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't > >>> get > >>>>>>>>>>>>> something?). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic > >> of > >>>> "FOR > >>>>>>>>>>>>> SYSTEM_TIME > >>>>>>>>>>>>>>>>>>>>>>>> AS OF > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proc_time” > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR > >>>>> SYSTEM_TIME > >>>>>>> AS > >>>>>>>>>>> OF > >>>>>>>>>>>>>>>>>>>>>>>> proc_time" > >>>>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as > >>> you > >>>>>>> said, > >>>>>>>>>>>> users > >>>>>>>>>>>>> go > >>>>>>>>>>>>>>>>>>>>>>>> on it > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better > >> performance > >>>> (no > >>>>>>> one > >>>>>>>>>>>>> proposed > >>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>> enable > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users > >> do > >>>> you > >>>>>>> mean > >>>>>>>>>>>>> other > >>>>>>>>>>>>>>>>>>>>>>>>>> developers > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers > >>>>> explicitly > >>>>>>>>>>>> specify > >>>>>>>>>>>>>>>>>>>>>>>> whether > >>>>>>>>>>>>>>>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in > >> the > >>>>> list > >>>>>>> of > >>>>>>>>>>>>> supported > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> options), > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't > >>>> want > >>>>>>> to. > >>>>>>>>> So > >>>>>>>>>>>>> what > >>>>>>>>>>>>>>>>>>>>>>>>>> exactly is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the difference between implementing > >>> caching > >>>>> in > >>>>>>>>>>>> modules > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in > >>>> flink-table-common > >>>>>>> from > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> considered > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on > >>>>>>>>>>>>> breaking/non-breaking > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF > >>>>> proc_time"? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table > >>>>>>> options in > >>>>>>>>>>>> DDL > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>> control > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has > >>> never > >>>>>>>>> happened > >>>>>>>>>>>>>>>>>>>>>>>> previously > >>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be cautious > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of > >>>>> semantics > >>>>>>> of > >>>>>>>>>>> DDL > >>>>>>>>>>>>>>>>>>>>>>>> options > >>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't > >>> it > >>>>>>> about > >>>>>>>>>>>>> limiting > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> scope > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user > >>>>> business > >>>>>>>>>>> logic > >>>>>>>>>>>>> rather > >>>>>>>>>>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding > >> logic > >>> in > >>>>> the > >>>>>>>>>>>>> framework? I > >>>>>>>>>>>>>>>>>>>>>>>>>> mean > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an > >>>> option > >>>>>>> with > >>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would be > >> the > >>>>> wrong > >>>>>>>>>>>>> decision, > >>>>>>>>>>>>>>>>>>>>>>>>>> because it > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business > >>> logic > >>>>> (not > >>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>>>>> performance > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several > >>>>> functions > >>>>>>> of > >>>>>>>>>>> ONE > >>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>> (there > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different > >>> caches). > >>>>>>> Does it > >>>>>>>>>>>>> really > >>>>>>>>>>>>>>>>>>>>>>>>>> matter for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the > >>> logic > >>>> is > >>>>>>>>>>>> located, > >>>>>>>>>>>>>>>>>>>>>>>> which is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> affected by the applied option? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option > >>>>>>> 'sink.parallelism', > >>>>>>>>>>>>> which in > >>>>>>>>>>>>>>>>>>>>>>>>>> some way > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the > >> framework" > >>>> and > >>>>> I > >>>>>>>>>>> don't > >>>>>>>>>>>>> see any > >>>>>>>>>>>>>>>>>>>>>>>>>> problem > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this > >>>>> all-caching > >>>>>>>>>>>>> scenario > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would become more complex > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate > >>>> discussion, > >>>>>>> but > >>>>>>>>>>>>> actually > >>>>>>>>>>>>>>>>>>>>>>>> in our > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem > >>>> quite > >>>>>>>>>>> easily > >>>>>>>>>>>> - > >>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>> reused > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need > >>> for > >>>> a > >>>>>>> new > >>>>>>>>>>>> API). > >>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>>>>> point is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors > >> use > >>>>>>>>>>> InputFormat > >>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>> scanning > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and > >> even > >>>> Hive > >>>>>>> - it > >>>>>>>>>>>> uses > >>>>>>>>>>>>>>>>>>>>>>>> class > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just > >> a > >>>>>>> wrapper > >>>>>>>>>>>> around > >>>>>>>>>>>>>>>>>>>>>>>>>> InputFormat. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the > >>>> ability > >>>>>>> to > >>>>>>>>>>>> reload > >>>>>>>>>>>>>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on > >>>> number > >>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> InputSplits, > >>>>>>>>>>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> has > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache > >> reload > >>>>> time > >>>>>>>>>>>>> significantly > >>>>>>>>>>>>>>>>>>>>>>>>>> reduces > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream > >>>> blocking). I > >>>>>>> know > >>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> usually > >>>>>>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> try > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink > >>>> code, > >>>>>>> but > >>>>>>>>>>>> maybe > >>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's > >>> an > >>>>>>> ideal > >>>>>>>>>>>>> solution, > >>>>>>>>>>>>>>>>>>>>>>>> maybe > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are better ones. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework > >>> might > >>>>>>>>>>> introduce > >>>>>>>>>>>>>>>>>>>>>>>>>> compatibility > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the > >>>>> developer > >>>>>>> of > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> won't > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use > >>> new > >>>>>>> cache > >>>>>>>>>>>>> options > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> incorrectly > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same > >> options > >>>>> into > >>>>>>> 2 > >>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he > >> will > >>>>> need > >>>>>>> to > >>>>>>>>>>> do > >>>>>>>>>>>>> is to > >>>>>>>>>>>>>>>>>>>>>>>>>> redirect > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's > >>>>>>> LookupConfig > >>>>>>>>> (+ > >>>>>>>>>>>>> maybe > >>>>>>>>>>>>>>>>>>>>>>>> add an > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> alias > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for options, if there was different > >>>> naming), > >>>>>>>>>>>> everything > >>>>>>>>>>>>>>>>>>>>>>>> will be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer > >>>> won't > >>>>>>> do > >>>>>>>>>>>>>>>>>>>>>>>> refactoring at > >>>>>>>>>>>>>>>>>>>>>>>>>> all, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the > >> connector > >>>>>>> because > >>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> backward > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer > >> wants > >>> to > >>>>> use > >>>>>>>>> his > >>>>>>>>>>>> own > >>>>>>>>>>>>>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>>>> logic, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the > >>>>> configs > >>>>>>>>> into > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> framework, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation > >> with > >>>>>>> already > >>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>>>>>>>> configs > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that > >> it's a > >>>>> rare > >>>>>>>>>>> case). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be > >> pushed > >>>> all > >>>>>>> the > >>>>>>>>>>> way > >>>>>>>>>>>>> down > >>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan > >>>> source > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth > >> is > >>>> that > >>>>>>> the > >>>>>>>>>>>> ONLY > >>>>>>>>>>>>>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is > >>>>>>> FileSystemTableSource > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it > >>>>> currently). > >>>>>>>>> Also > >>>>>>>>>>>>> for some > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> databases > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such > >>>>> complex > >>>>>>>>>>>> filters > >>>>>>>>>>>>>>>>>>>>>>>> that we > >>>>>>>>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in Flink. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to > >> the > >>>>> cache > >>>>>>>>>>> seems > >>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>>>> quite > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useful > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily > >> large > >>>>>>> amount of > >>>>>>>>>>>> data > >>>>>>>>>>>>>>>>>>>>>>>> from the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, > >>>>> suppose > >>>>>>> in > >>>>>>>>>>>>> dimension > >>>>>>>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'users' > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from > >> 20 > >>> to > >>>>> 40, > >>>>>>>>> and > >>>>>>>>>>>>> input > >>>>>>>>>>>>>>>>>>>>>>>> stream > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed > >>> by > >>>>> age > >>>>>>> of > >>>>>>>>>>>>> users. If > >>>>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30', > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. > >>>> This > >>>>>>> means > >>>>>>>>>>>> the > >>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by > >>> almost > >>>> 2 > >>>>>>>>> times. > >>>>>>>>>>>> It > >>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>>>> gain a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> huge > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this > >>>>> optimization > >>>>>>>>>>> starts > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> really > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> shine > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without > >>>> filters > >>>>>>> and > >>>>>>>>>>>>> projections > >>>>>>>>>>>>>>>>>>>>>>>>>> can't > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> fit > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This > >>> opens > >>>> up > >>>>>>>>>>>>> additional > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> possibilities > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as > >> 'not > >>>>> quite > >>>>>>>>>>>>> useful'. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices > >>>>>>> regarding > >>>>>>>>>>> this > >>>>>>>>>>>>> topic! > >>>>>>>>>>>>>>>>>>>>>>>>>> Because > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial > >>>> points, > >>>>>>> and I > >>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> help > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to > >>> come > >>>>> to a > >>>>>>>>>>>>> consensus. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng > >>> Ren > >>>> < > >>>>>>>>>>>>>>>>>>>>>>>> renqs...@gmail.com > >>>>>>>>>>>>>>>>>>>>>>>>>>> : > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry > >> for > >>> my > >>>>>>> late > >>>>>>>>>>>>> response! > >>>>>>>>>>>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>>>>>>>>> had > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark > >>> and > >>>>>>> Leonard > >>>>>>>>>>>> and > >>>>>>>>>>>>> I’d > >>>>>>>>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of > >>>> implementing > >>>>>>> the > >>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>> logic in > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the > >>>>>>> user-provided > >>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>> function, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs > >>> extending > >>>>>>>>>>>>> TableFunction > >>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the > >> semantic > >>> of > >>>>>>> "FOR > >>>>>>>>>>>>>>>>>>>>>>>> SYSTEM_TIME > >>>>>>>>>>>>>>>>>>>>>>>>>> AS OF > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly > >>>> reflect > >>>>>>> the > >>>>>>>>>>>>> content > >>>>>>>>>>>>>>>>>>>>>>>> of the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If > >> users > >>>>>>> choose > >>>>>>>>> to > >>>>>>>>>>>>> enable > >>>>>>>>>>>>>>>>>>>>>>>>>> caching > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate > >>> that > >>>>>>> this > >>>>>>>>>>>>> breakage is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> acceptable > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we > >>> prefer > >>>>> not > >>>>>>> to > >>>>>>>>>>>>> provide > >>>>>>>>>>>>>>>>>>>>>>>>>> caching on > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table runtime level. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation > >>> in > >>>>> the > >>>>>>>>>>>>> framework > >>>>>>>>>>>>>>>>>>>>>>>>>> (whether > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around > >>> TableFunction), > >>>> we > >>>>>>> have > >>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> confront a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> situation > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to > >>> control > >>>>> the > >>>>>>>>>>>>> behavior of > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> framework, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and > >>>>> should > >>>>>>> be > >>>>>>>>>>>>> cautious. > >>>>>>>>>>>>>>>>>>>>>>>>>> Under > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the > >>>> framework > >>>>>>>>> should > >>>>>>>>>>>>> only be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> specified > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and > >>> it’s > >>>>>>> hard > >>>>>>>>> to > >>>>>>>>>>>>> apply > >>>>>>>>>>>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> general > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configs to a specific table. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup > >> source > >>>>> loads > >>>>>>> and > >>>>>>>>>>>>> refresh > >>>>>>>>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> records > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve > >>>> high > >>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>> performance > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (like > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hive > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also > >>> widely > >>>>>>> used > >>>>>>>>> by > >>>>>>>>>>>> our > >>>>>>>>>>>>>>>>>>>>>>>> internal > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around > >>> the > >>>>>>> user’s > >>>>>>>>>>>>>>>>>>>>>>>> TableFunction > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> works > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fine > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to > >>>>>>> introduce a > >>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>>>>>>>>>> interface for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design > >> would > >>>>>>> become > >>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>>>> complex. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the > >> framework > >>>>> might > >>>>>>>>>>>>> introduce > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> compatibility > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like > >>>> there > >>>>>>> might > >>>>>>>>>>>>> exist two > >>>>>>>>>>>>>>>>>>>>>>>>>> caches > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the > >> user > >>>>>>>>>>> incorrectly > >>>>>>>>>>>>>>>>>>>>>>>> configures > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another > >>>> implemented > >>>>>>> by > >>>>>>>>>>> the > >>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>>>> source). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by > >>>>>>> Alexander, I > >>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>>>> filters > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the > >> way > >>>> down > >>>>>>> to > >>>>>>>>>>> the > >>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>> function, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead > >> of > >>>> the > >>>>>>>>>>> runner > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> cache. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the > >>>> network > >>>>>>> I/O > >>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> pressure > >>>>>>>>>>>>>>>>>>>>>>>>>> on the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> external system, and only applying > >> these > >>>>>>>>>>>> optimizations > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not quite useful. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to > >>>>> reflect > >>>>>>> our > >>>>>>>>>>>>> ideas. > >>>>>>>>>>>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part > >>> of > >>>>>>>>>>>>> TableFunction, > >>>>>>>>>>>>>>>>>>>>>>>> and we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provide some helper classes > >>>>>>> (CachingTableFunction, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to > >> developers > >>>> and > >>>>>>>>>>> regulate > >>>>>>>>>>>>>>>>>>>>>>>> metrics > >>>>>>>>>>>>>>>>>>>>>>>>>> of the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your > >> reference. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>> https://github.com/PatrickRen/flink/tree/FLIP-221 > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM > >>> Александр > >>>>>>> Смирнов > >>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier > >>>>> solution > >>>>>>> as > >>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> first > >>>>>>>>>>>>>>>>>>>>>>>>>> step: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are > >> mutually > >>>>>>> exclusive > >>>>>>>>>>>>>>>>>>>>>>>> (originally > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because > >>>>> conceptually > >>>>>>>>> they > >>>>>>>>>>>>> follow > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are > >>>>>>> different. > >>>>>>>>>>> If > >>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>>>> go one > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> way, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future > >>> will > >>>>> mean > >>>>>>>>>>>>> deleting > >>>>>>>>>>>>>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for > >>>>>>> connectors. > >>>>>>>>>>> So > >>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>> think we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community > >>>> about > >>>>>>> that > >>>>>>>>>>> and > >>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>>>>>>> work > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> together > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on > >>>> tasks > >>>>>>> for > >>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>>>>>>>> parts > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache > >>> unification > >>>> / > >>>>>>>>>>>>> introducing > >>>>>>>>>>>>>>>>>>>>>>>>>> proposed > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> set > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, > >>>> Qingsheng? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the > >>>>> requests > >>>>>>>>>>> after > >>>>>>>>>>>>>>>>>>>>>>>> filter > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to > >>> fields > >>>>> of > >>>>>>> the > >>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>>>> table, we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only > >>> after > >>>>>>> that we > >>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>> filter > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> responses, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have > >>>> filter > >>>>>>>>>>>>> pushdown. So > >>>>>>>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filtering > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be > >>>> much > >>>>>>> less > >>>>>>>>>>>> rows > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>> cache. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your > >>>>> architecture > >>>>>>> is > >>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>> shared. > >>>>>>>>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be > >> honest. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such > >>>> kinds > >>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> conversations > >>>>>>>>>>>>>>>>>>>>>>>>>> :) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the > >>> confluence, > >>>>> so > >>>>>>> I > >>>>>>>>>>>> made a > >>>>>>>>>>>>>>>>>>>>>>>> Jira > >>>>>>>>>>>>>>>>>>>>>>>>>> issue, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes > >> in > >>>>> more > >>>>>>>>>>>> details > >>>>>>>>>>>>> - > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-27411. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid > >>> Heise > >>>> < > >>>>>>>>>>>>>>>>>>>>>>>> ar...@apache.org>: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the > >>>> inconsistency > >>>>>>> was > >>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>>>> satisfying > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> me. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but > >>>> could > >>>>>>> also > >>>>>>>>>>>> live > >>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> easier > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead > >> of > >>>>>>> making > >>>>>>>>>>>>> caching > >>>>>>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather > >>>> devise a > >>>>>>>>>>> caching > >>>>>>>>>>>>>>>>>>>>>>>> layer > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> around X. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a > >>> CachingTableFunction > >>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>> delegates to > >>>>>>>>>>>>>>>>>>>>>>>>>> X in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. > >>>> Lifting > >>>>>>> it > >>>>>>>>>>> into > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> operator > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> model > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is > >>>>>>> probably > >>>>>>>>>>>>>>>>>>>>>>>> unnecessary > >>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first step > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source > >>> will > >>>>> only > >>>>>>>>>>>> receive > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> requests > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be > >>> more > >>>>>>>>>>>> interesting > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> save > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> memory). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the > >>>> changes > >>>>> of > >>>>>>>>>>> this > >>>>>>>>>>>>> FLIP > >>>>>>>>>>>>>>>>>>>>>>>>>> would be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> limited to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public > >>>>> interfaces. > >>>>>>>>>>>>> Everything > >>>>>>>>>>>>>>>>>>>>>>>> else > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remains > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. > >> That > >>>>> means > >>>>>>> we > >>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>> easily > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> incorporate > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that > >> Alexander > >>>>>>> pointed > >>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>>>>>>> later. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your > >>>>> architecture > >>>>>>> is > >>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>> shared. > >>>>>>>>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be > >> honest. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM > >>>> Александр > >>>>>>>>>>> Смирнов > >>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, > >>> I'm > >>>>>>> not a > >>>>>>>>>>>>>>>>>>>>>>>> committer > >>>>>>>>>>>>>>>>>>>>>>>>>> yet, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'd > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this > >>>> FLIP > >>>>>>>>> really > >>>>>>>>>>>>>>>>>>>>>>>>>> interested > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> me. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar > >>>>>>> feature in > >>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>>>>>>>> company’s > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share > >> our > >>>>>>> thoughts > >>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>> this and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open source. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better > >> alternative > >>>>> than > >>>>>>>>>>>>>>>>>>>>>>>> introducing an > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> abstract > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction > >>>>>>>>> (CachingTableFunction). > >>>>>>>>>>>> As > >>>>>>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>>>>> know, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the > >>>>>>> flink-table-common > >>>>>>>>>>>>>>>>>>>>>>>> module, > >>>>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> provides > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with > >> tables – > >>>>> it’s > >>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>>>>> convenient > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> importing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, > >>>>>>> CachingTableFunction > >>>>>>>>>>>>> contains > >>>>>>>>>>>>>>>>>>>>>>>>>> logic > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution, so this class > >> and > >>>>>>>>>>> everything > >>>>>>>>>>>>>>>>>>>>>>>>>> connected > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another > >> module, > >>>>>>> probably > >>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to > >>>>> depend > >>>>>>> on > >>>>>>>>>>>>> another > >>>>>>>>>>>>>>>>>>>>>>>>>> module, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, > >>> which > >>>>>>> doesn’t > >>>>>>>>>>>>> sound > >>>>>>>>>>>>>>>>>>>>>>>>>> good. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method > >>>>>>>>> ‘getLookupConfig’ > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupTableSource > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow > >>>>>>> connectors > >>>>>>>>> to > >>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>>>> pass > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, > >>>> therefore > >>>>>>> they > >>>>>>>>>>>> won’t > >>>>>>>>>>>>>>>>>>>>>>>>>> depend on > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runtime > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs > >>>>> planner > >>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>>>> construct a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lookup > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding > >>>> runtime > >>>>>>> logic > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (ProcessFunctions > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). > >>>> Architecture > >>>>>>>>> looks > >>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pinned > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is > >>>>> actually > >>>>>>>>>>> yours > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> CacheConfig). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, > >> that > >>>> will > >>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>> responsible > >>>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> – > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his > >>>>>>> inheritors. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in > >>>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime > >>>>>>>>>>>>>>>>>>>>>>>>>> - > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, > >>>> AsyncLookupJoinRunner, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes > >>>>>>>>>>> LookupJoinCachingRunner, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, > >> etc. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more > >> powerful > >>>>>>> advantage > >>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> such a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solution. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower > >>> level, > >>>>> we > >>>>>>> can > >>>>>>>>>>>>> apply > >>>>>>>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it. > >>>>>>> LookupJoinRunnerWithCalc > >>>>>>>>>>> was > >>>>>>>>>>>>>>>>>>>>>>>> named > >>>>>>>>>>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ > >> function, > >>>>> which > >>>>>>>>>>>> actually > >>>>>>>>>>>>>>>>>>>>>>>>>> mostly > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consists > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with > >>>> lookup > >>>>>>> table > >>>>>>>>>>> B > >>>>>>>>>>>>>>>>>>>>>>>>>> condition > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘JOIN … > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ON > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 > >>>> WHERE > >>>>>>>>>>>> B.salary > > >>>>>>>>>>>>>>>>>>>>>>>> 1000’ > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘calc’ > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters > >> A.age = > >>>>>>> B.age + > >>>>>>>>>>> 10 > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> B.salary > > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1000. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before > >>>> storing > >>>>>>>>>>> records > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>>>> cache, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> size > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly > >> reduced: > >>>>>>> filters = > >>>>>>>>>>>>> avoid > >>>>>>>>>>>>>>>>>>>>>>>>>> storing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> useless > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = > >>> reduce > >>>>>>>>> records’ > >>>>>>>>>>>>>>>>>>>>>>>> size. So > >>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> initial > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can > >> be > >>>>>>>>> increased > >>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>> user. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng > >> Ren > >>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a > >>>>>>> discussion > >>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-221[1], > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup > >>>> table > >>>>>>>>> cache > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> standard > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source > >>>>> should > >>>>>>>>>>>>> implement > >>>>>>>>>>>>>>>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> own > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cache to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there > >>> isn’t a > >>>>>>>>>>> standard > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>>>> metrics > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs > >> with > >>>>> lookup > >>>>>>>>>>>> joins, > >>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>>> is a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> common > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs > >>>>>>> including > >>>>>>>>>>>>> cache, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrapper > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new > >>> table > >>>>>>>>> options. > >>>>>>>>>>>>>>>>>>>>>>>> Please > >>>>>>>>>>>>>>>>>>>>>>>>>> take a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> look > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. > >>> Any > >>>>>>>>>>>> suggestions > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> comments > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated! > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Roman Boyko > >>>>>>>>>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>> Best Regards, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Qingsheng Ren > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Real-time Computing Team > >>>>>>>>>>>>>>>>>>>>> Alibaba Cloud > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >