Thanks Alexander for your reply. We can discuss the new interface when it comes out.
We are more inclined to deprecate the connector `async` option when discussing FLIP-234 [1]. We should use hint to let planner decide. Although the discussion has not yet produced a conclusion, can we remove this option from this FLIP? It doesn't seem to be related to this FLIP, but more to FLIP-234, and we can form a conclusion over there. [1] https://lists.apache.org/thread/9k1sl2519kh2n3yttwqc00p07xdfns3h Best, Jingsong On Wed, Jun 1, 2022 at 4:59 AM Jing Ge <j...@ververica.com> wrote: > Hi Jark, > > Thanks for clarifying it. It would be fine. as long as we could provide the > no-cache solution. I was just wondering if the client side cache could > really help when HBase is used, since the data to look up should be huge. > Depending how much data will be cached on the client side, the data that > should be lru in e.g. LruBlockCache will not be lru anymore. In the worst > case scenario, once the cached data at client side is expired, the request > will hit disk which will cause extra latency temporarily, if I am not > mistaken. > > Best regards, > Jing > > On Mon, May 30, 2022 at 9:59 AM Jark Wu <imj...@gmail.com> wrote: > > > Hi Jing Ge, > > > > What do you mean about the "impact on the block cache used by HBase"? > > In my understanding, the connector cache and HBase cache are totally two > > things. > > The connector cache is a local/client cache, and the HBase cache is a > > server cache. > > > > > does it make sense to have a no-cache solution as one of the > > default solutions so that customers will have no effort for the migration > > if they want to stick with Hbase cache > > > > The implementation migration should be transparent to users. Take the > HBase > > connector as > > an example, it already supports lookup cache but is disabled by default. > > After migration, the > > connector still disables cache by default (i.e. no-cache solution). No > > migration effort for users. > > > > HBase cache and connector cache are two different things. HBase cache > can't > > simply replace > > connector cache. Because one of the most important usages for connector > > cache is reducing > > the I/O request/response and improving the throughput, which can achieve > > by just using a server cache. > > > > Best, > > Jark > > > > > > > > > > On Fri, 27 May 2022 at 22:42, Jing Ge <j...@ververica.com> wrote: > > > > > Thanks all for the valuable discussion. The new feature looks very > > > interesting. > > > > > > According to the FLIP description: "*Currently we have JDBC, Hive and > > HBase > > > connector implemented lookup table source. All existing implementations > > > will be migrated to the current design and the migration will be > > > transparent to end users*." I was only wondering if we should pay > > attention > > > to HBase and similar DBs. Since, commonly, the lookup data will be huge > > > while using HBase, partial caching will be used in this case, if I am > not > > > mistaken, which might have an impact on the block cache used by HBase, > > e.g. > > > LruBlockCache. > > > Another question is that, since HBase provides a sophisticated cache > > > solution, does it make sense to have a no-cache solution as one of the > > > default solutions so that customers will have no effort for the > migration > > > if they want to stick with Hbase cache? > > > > > > Best regards, > > > Jing > > > > > > On Fri, May 27, 2022 at 11:19 AM Jingsong Li <jingsongl...@gmail.com> > > > wrote: > > > > > > > Hi all, > > > > > > > > I think the problem now is below: > > > > 1. AllCache and PartialCache interface on the non-uniform, one needs > to > > > > provide LookupProvider, the other needs to provide CacheBuilder. > > > > 2. AllCache definition is not flexible, for example, PartialCache can > > use > > > > any custom storage, while the AllCache can not, AllCache can also be > > > > considered to store memory or disk, also need a flexible strategy. > > > > 3. AllCache can not customize ReloadStrategy, currently only > > > > ScheduledReloadStrategy. > > > > > > > > In order to solve the above problems, the following are my ideas. > > > > > > > > ## Top level cache interfaces: > > > > > > > > ``` > > > > > > > > public interface CacheLookupProvider extends > > > > LookupTableSource.LookupRuntimeProvider { > > > > > > > > CacheBuilder createCacheBuilder(); > > > > } > > > > > > > > > > > > public interface CacheBuilder { > > > > Cache create(); > > > > } > > > > > > > > > > > > public interface Cache { > > > > > > > > /** > > > > * Returns the value associated with key in this cache, or null > if > > > > there is no cached value for > > > > * key. > > > > */ > > > > @Nullable > > > > Collection<RowData> getIfPresent(RowData key); > > > > > > > > /** Returns the number of key-value mappings in the cache. */ > > > > long size(); > > > > } > > > > > > > > ``` > > > > > > > > ## Partial cache > > > > > > > > ``` > > > > > > > > public interface PartialCacheLookupFunction extends > > CacheLookupProvider { > > > > > > > > @Override > > > > PartialCacheBuilder createCacheBuilder(); > > > > > > > > /** Creates an {@link LookupFunction} instance. */ > > > > LookupFunction createLookupFunction(); > > > > } > > > > > > > > > > > > public interface PartialCacheBuilder extends CacheBuilder { > > > > > > > > PartialCache create(); > > > > } > > > > > > > > > > > > public interface PartialCache extends Cache { > > > > > > > > /** > > > > * Associates the specified value rows with the specified key row > > > > in the cache. If the cache > > > > * previously contained value associated with the key, the old > > > > value is replaced by the > > > > * specified value. > > > > * > > > > * @return the previous value rows associated with key, or null > if > > > > there was no mapping for key. > > > > * @param key - key row with which the specified value is to be > > > > associated > > > > * @param value – value rows to be associated with the specified > > key > > > > */ > > > > Collection<RowData> put(RowData key, Collection<RowData> value); > > > > > > > > /** Discards any cached value for the specified key. */ > > > > void invalidate(RowData key); > > > > } > > > > > > > > ``` > > > > > > > > ## All cache > > > > ``` > > > > > > > > public interface AllCacheLookupProvider extends CacheLookupProvider { > > > > > > > > void registerReloadStrategy(ScheduledExecutorService > > > > executorService, Reloader reloader); > > > > > > > > ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(); > > > > > > > > @Override > > > > AllCacheBuilder createCacheBuilder(); > > > > } > > > > > > > > > > > > public interface AllCacheBuilder extends CacheBuilder { > > > > > > > > AllCache create(); > > > > } > > > > > > > > > > > > public interface AllCache extends Cache { > > > > > > > > void putAll(Iterator<Map<RowData, RowData>> allEntries); > > > > > > > > void clearAll(); > > > > } > > > > > > > > > > > > public interface Reloader { > > > > > > > > void reload(); > > > > } > > > > > > > > ``` > > > > > > > > Best, > > > > Jingsong > > > > > > > > On Fri, May 27, 2022 at 11:10 AM Jingsong Li <jingsongl...@gmail.com > > > > > > wrote: > > > > > > > > > Thanks Qingsheng and all for your discussion. > > > > > > > > > > Very sorry to jump in so late. > > > > > > > > > > Maybe I missed something? > > > > > My first impression when I saw the cache interface was, why don't > we > > > > > provide an interface similar to guava cache [1], on top of guava > > cache, > > > > > caffeine also makes extensions for asynchronous calls.[2] > > > > > There is also the bulk load in caffeine too. > > > > > > > > > > I am also more confused why first from LookupCacheFactory.Builder > and > > > > then > > > > > to Factory to create Cache. > > > > > > > > > > [1] https://github.com/google/guava > > > > > [2] https://github.com/ben-manes/caffeine/wiki/Population > > > > > > > > > > Best, > > > > > Jingsong > > > > > > > > > > On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com> wrote: > > > > > > > > > >> After looking at the new introduced ReloadTime and Becket's > comment, > > > > >> I agree with Becket we should have a pluggable reloading strategy. > > > > >> We can provide some common implementations, e.g., periodic > > reloading, > > > > and > > > > >> daily reloading. > > > > >> But there definitely be some connector- or business-specific > > reloading > > > > >> strategies, e.g. > > > > >> notify by a zookeeper watcher, reload once a new Hive partition is > > > > >> complete. > > > > >> > > > > >> Best, > > > > >> Jark > > > > >> > > > > >> On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> > > > wrote: > > > > >> > > > > >> > Hi Qingsheng, > > > > >> > > > > > >> > Thanks for updating the FLIP. A few comments / questions below: > > > > >> > > > > > >> > 1. Is there a reason that we have both "XXXFactory" and > > > "XXXProvider". > > > > >> > What is the difference between them? If they are the same, can > we > > > just > > > > >> use > > > > >> > XXXFactory everywhere? > > > > >> > > > > > >> > 2. Regarding the FullCachingLookupProvider, should the reloading > > > > policy > > > > >> > also be pluggable? Periodical reloading could be sometimes be > > tricky > > > > in > > > > >> > practice. For example, if user uses 24 hours as the cache > refresh > > > > >> interval > > > > >> > and some nightly batch job delayed, the cache update may still > see > > > the > > > > >> > stale data. > > > > >> > > > > > >> > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity > > > should > > > > be > > > > >> > removed. > > > > >> > > > > > >> > 4. The purpose of LookupFunctionProvider#cacheMissingKey() > seems a > > > > >> little > > > > >> > confusing to me. If Optional<LookupCacheFactory> > getCacheFactory() > > > > >> returns > > > > >> > a non-empty factory, doesn't that already indicates the > framework > > to > > > > >> cache > > > > >> > the missing keys? Also, why is this method returning an > > > > >> Optional<Boolean> > > > > >> > instead of boolean? > > > > >> > > > > > >> > Thanks, > > > > >> > > > > > >> > Jiangjie (Becket) Qin > > > > >> > > > > > >> > > > > > >> > > > > > >> > On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren < > renqs...@gmail.com > > > > > > > >> wrote: > > > > >> > > > > > >> >> Hi Lincoln and Jark, > > > > >> >> > > > > >> >> Thanks for the comments! If the community reaches a consensus > > that > > > we > > > > >> use > > > > >> >> SQL hint instead of table options to decide whether to use sync > > or > > > > >> async > > > > >> >> mode, it’s indeed not necessary to introduce the “lookup.async” > > > > option. > > > > >> >> > > > > >> >> I think it’s a good idea to let the decision of async made on > > query > > > > >> >> level, which could make better optimization with more > infomation > > > > >> gathered > > > > >> >> by planner. Is there any FLIP describing the issue in > > FLINK-27625? > > > I > > > > >> >> thought FLIP-234 is only proposing adding SQL hint for retry on > > > > missing > > > > >> >> instead of the entire async mode to be controlled by hint. > > > > >> >> > > > > >> >> Best regards, > > > > >> >> > > > > >> >> Qingsheng > > > > >> >> > > > > >> >> > On May 25, 2022, at 15:13, Lincoln Lee < > lincoln.8...@gmail.com > > > > > > > >> wrote: > > > > >> >> > > > > > >> >> > Hi Jark, > > > > >> >> > > > > > >> >> > Thanks for your reply! > > > > >> >> > > > > > >> >> > Currently 'lookup.async' just lies in HBase connector, I have > > no > > > > idea > > > > >> >> > whether or when to remove it (we can discuss it in another > > issue > > > > for > > > > >> the > > > > >> >> > HBase connector after FLINK-27625 is done), just not add it > > into > > > a > > > > >> >> common > > > > >> >> > option now. > > > > >> >> > > > > > >> >> > Best, > > > > >> >> > Lincoln Lee > > > > >> >> > > > > > >> >> > > > > > >> >> > Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道: > > > > >> >> > > > > > >> >> >> Hi Lincoln, > > > > >> >> >> > > > > >> >> >> I have taken a look at FLIP-234, and I agree with you that > the > > > > >> >> connectors > > > > >> >> >> can > > > > >> >> >> provide both async and sync runtime providers simultaneously > > > > instead > > > > >> >> of one > > > > >> >> >> of them. > > > > >> >> >> At that point, "lookup.async" looks redundant. If this > option > > is > > > > >> >> planned to > > > > >> >> >> be removed > > > > >> >> >> in the long term, I think it makes sense not to introduce it > > in > > > > this > > > > >> >> FLIP. > > > > >> >> >> > > > > >> >> >> Best, > > > > >> >> >> Jark > > > > >> >> >> > > > > >> >> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee < > > > lincoln.8...@gmail.com > > > > > > > > > >> >> wrote: > > > > >> >> >> > > > > >> >> >>> Hi Qingsheng, > > > > >> >> >>> > > > > >> >> >>> Sorry for jumping into the discussion so late. It's a good > > idea > > > > >> that > > > > >> >> we > > > > >> >> >> can > > > > >> >> >>> have a common table option. I have a minor comments on > > > > >> 'lookup.async' > > > > >> >> >> that > > > > >> >> >>> not make it a common option: > > > > >> >> >>> > > > > >> >> >>> The table layer abstracts both sync and async lookup > > > > capabilities, > > > > >> >> >>> connectors implementers can choose one or both, in the case > > of > > > > >> >> >> implementing > > > > >> >> >>> only one capability(status of the most of existing builtin > > > > >> connectors) > > > > >> >> >>> 'lookup.async' will not be used. And when a connector has > > both > > > > >> >> >>> capabilities, I think this choice is more suitable for > making > > > > >> >> decisions > > > > >> >> >> at > > > > >> >> >>> the query level, for example, table planner can choose the > > > > physical > > > > >> >> >>> implementation of async lookup or sync lookup based on its > > cost > > > > >> >> model, or > > > > >> >> >>> users can give query hint based on their own better > > > > >> understanding. If > > > > >> >> >>> there is another common table option 'lookup.async', it may > > > > confuse > > > > >> >> the > > > > >> >> >>> users in the long run. > > > > >> >> >>> > > > > >> >> >>> So, I prefer to leave the 'lookup.async' option in private > > > place > > > > >> (for > > > > >> >> the > > > > >> >> >>> current hbase connector) and not turn it into a common > > option. > > > > >> >> >>> > > > > >> >> >>> WDYT? > > > > >> >> >>> > > > > >> >> >>> Best, > > > > >> >> >>> Lincoln Lee > > > > >> >> >>> > > > > >> >> >>> > > > > >> >> >>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道: > > > > >> >> >>> > > > > >> >> >>>> Hi Alexander, > > > > >> >> >>>> > > > > >> >> >>>> Thanks for the review! We recently updated the FLIP and > you > > > can > > > > >> find > > > > >> >> >>> those > > > > >> >> >>>> changes from my latest email. Since some terminologies has > > > > >> changed so > > > > >> >> >>> I’ll > > > > >> >> >>>> use the new concept for replying your comments. > > > > >> >> >>>> > > > > >> >> >>>> 1. Builder vs ‘of’ > > > > >> >> >>>> I’m OK to use builder pattern if we have additional > optional > > > > >> >> parameters > > > > >> >> >>>> for full caching mode (“rescan” previously). The > > > > >> schedule-with-delay > > > > >> >> >> idea > > > > >> >> >>>> looks reasonable to me, but I think we need to redesign > the > > > > >> builder > > > > >> >> API > > > > >> >> >>> of > > > > >> >> >>>> full caching to make it more descriptive for developers. > > Would > > > > you > > > > >> >> mind > > > > >> >> >>>> sharing your ideas about the API? For accessing the FLIP > > > > workspace > > > > >> >> you > > > > >> >> >>> can > > > > >> >> >>>> just provide your account ID and ping any PMC member > > including > > > > >> Jark. > > > > >> >> >>>> > > > > >> >> >>>> 2. Common table options > > > > >> >> >>>> We have some discussions these days and propose to > > introduce 8 > > > > >> common > > > > >> >> >>>> table options about caching. It has been updated on the > > FLIP. > > > > >> >> >>>> > > > > >> >> >>>> 3. Retries > > > > >> >> >>>> I think we are on the same page :-) > > > > >> >> >>>> > > > > >> >> >>>> For your additional concerns: > > > > >> >> >>>> 1) The table option has been updated. > > > > >> >> >>>> 2) We got “lookup.cache” back for configuring whether to > use > > > > >> partial > > > > >> >> or > > > > >> >> >>>> full caching mode. > > > > >> >> >>>> > > > > >> >> >>>> Best regards, > > > > >> >> >>>> > > > > >> >> >>>> Qingsheng > > > > >> >> >>>> > > > > >> >> >>>> > > > > >> >> >>>> > > > > >> >> >>>>> On May 19, 2022, at 17:25, Александр Смирнов < > > > > >> smirale...@gmail.com> > > > > >> >> >>>> wrote: > > > > >> >> >>>>> > > > > >> >> >>>>> Also I have a few additions: > > > > >> >> >>>>> 1) maybe rename 'lookup.cache.maximum-size' to > > > > >> >> >>>>> 'lookup.cache.max-rows'? I think it will be more clear > that > > > we > > > > >> talk > > > > >> >> >>>>> not about bytes, but about the number of rows. Plus it > fits > > > > more, > > > > >> >> >>>>> considering my optimization with filters. > > > > >> >> >>>>> 2) How will users enable rescanning? Are we going to > > separate > > > > >> >> caching > > > > >> >> >>>>> and rescanning from the options point of view? Like > > initially > > > > we > > > > >> had > > > > >> >> >>>>> one option 'lookup.cache' with values LRU / ALL. I think > > now > > > we > > > > >> can > > > > >> >> >>>>> make a boolean option 'lookup.rescan'. RescanInterval can > > be > > > > >> >> >>>>> 'lookup.rescan.interval', etc. > > > > >> >> >>>>> > > > > >> >> >>>>> Best regards, > > > > >> >> >>>>> Alexander > > > > >> >> >>>>> > > > > >> >> >>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов < > > > > >> smirale...@gmail.com > > > > >> >> >>> : > > > > >> >> >>>>>> > > > > >> >> >>>>>> Hi Qingsheng and Jark, > > > > >> >> >>>>>> > > > > >> >> >>>>>> 1. Builders vs 'of' > > > > >> >> >>>>>> I understand that builders are used when we have > multiple > > > > >> >> >> parameters. > > > > >> >> >>>>>> I suggested them because we could add parameters later. > To > > > > >> prevent > > > > >> >> >>>>>> Builder for ScanRuntimeProvider from looking redundant I > > can > > > > >> >> suggest > > > > >> >> >>>>>> one more config now - "rescanStartTime". > > > > >> >> >>>>>> It's a time in UTC (LocalTime class) when the first > reload > > > of > > > > >> cache > > > > >> >> >>>>>> starts. This parameter can be thought of as > 'initialDelay' > > > > (diff > > > > >> >> >>>>>> between current time and rescanStartTime) in method > > > > >> >> >>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It > > can > > > be > > > > >> very > > > > >> >> >>>>>> useful when the dimension table is updated by some other > > > > >> scheduled > > > > >> >> >> job > > > > >> >> >>>>>> at a certain time. Or when the user simply wants a > second > > > scan > > > > >> >> >> (first > > > > >> >> >>>>>> cache reload) be delayed. This option can be used even > > > without > > > > >> >> >>>>>> 'rescanInterval' - in this case 'rescanInterval' will be > > one > > > > >> day. > > > > >> >> >>>>>> If you are fine with this option, I would be very glad > if > > > you > > > > >> would > > > > >> >> >>>>>> give me access to edit FLIP page, so I could add it > myself > > > > >> >> >>>>>> > > > > >> >> >>>>>> 2. Common table options > > > > >> >> >>>>>> I also think that FactoryUtil would be overloaded by all > > > cache > > > > >> >> >>>>>> options. But maybe unify all suggested options, not only > > for > > > > >> >> default > > > > >> >> >>>>>> cache? I.e. class 'LookupOptions', that unifies default > > > cache > > > > >> >> >> options, > > > > >> >> >>>>>> rescan options, 'async', 'maxRetries'. WDYT? > > > > >> >> >>>>>> > > > > >> >> >>>>>> 3. Retries > > > > >> >> >>>>>> I'm fine with suggestion close to > > RetryUtils#tryTimes(times, > > > > >> call) > > > > >> >> >>>>>> > > > > >> >> >>>>>> [1] > > > > >> >> >>>> > > > > >> >> >>> > > > > >> >> >> > > > > >> >> > > > > >> > > > > > > > > > > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- > > > > >> >> >>>>>> > > > > >> >> >>>>>> Best regards, > > > > >> >> >>>>>> Alexander > > > > >> >> >>>>>> > > > > >> >> >>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren < > > > renqs...@gmail.com > > > > >: > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> Hi Jark and Alexander, > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> Thanks for your comments! I’m also OK to introduce > common > > > > table > > > > >> >> >>>> options. I prefer to introduce a new > > DefaultLookupCacheOptions > > > > >> class > > > > >> >> >> for > > > > >> >> >>>> holding these option definitions because putting all > options > > > > into > > > > >> >> >>>> FactoryUtil would make it a bit ”crowded” and not well > > > > >> categorized. > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> FLIP has been updated according to suggestions above: > > > > >> >> >>>>>>> 1. Use static “of” method for constructing > > > > >> RescanRuntimeProvider > > > > >> >> >>>> considering both arguments are required. > > > > >> >> >>>>>>> 2. Introduce new table options matching > > > > >> DefaultLookupCacheFactory > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> Best, > > > > >> >> >>>>>>> Qingsheng > > > > >> >> >>>>>>> > > > > >> >> >>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu < > > imj...@gmail.com> > > > > >> wrote: > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> Hi Alex, > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> 1) retry logic > > > > >> >> >>>>>>>> I think we can extract some common retry logic into > > > > utilities, > > > > >> >> >> e.g. > > > > >> >> >>>> RetryUtils#tryTimes(times, call). > > > > >> >> >>>>>>>> This seems independent of this FLIP and can be reused > by > > > > >> >> >> DataStream > > > > >> >> >>>> users. > > > > >> >> >>>>>>>> Maybe we can open an issue to discuss this and where > to > > > put > > > > >> it. > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> 2) cache ConfigOptions > > > > >> >> >>>>>>>> I'm fine with defining cache config options in the > > > > framework. > > > > >> >> >>>>>>>> A candidate place to put is FactoryUtil which also > > > includes > > > > >> >> >>>> "sink.parallelism", "format" options. > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> Best, > > > > >> >> >>>>>>>> Jark > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> > > > > >> >> >>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов < > > > > >> >> >>> smirale...@gmail.com> > > > > >> >> >>>> wrote: > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> Hi Qingsheng, > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> Thank you for considering my comments. > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>>> there might be custom logic before making retry, > such > > as > > > > >> >> >>>> re-establish the connection > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> Yes, I understand that. I meant that such logic can > be > > > > >> placed in > > > > >> >> >> a > > > > >> >> >>>>>>>>> separate function, that can be implemented by > > connectors. > > > > >> Just > > > > >> >> >>> moving > > > > >> >> >>>>>>>>> the retry logic would make connector's LookupFunction > > > more > > > > >> >> >> concise > > > > >> >> >>> + > > > > >> >> >>>>>>>>> avoid duplicate code. However, it's a minor change. > The > > > > >> decision > > > > >> >> >> is > > > > >> >> >>>> up > > > > >> >> >>>>>>>>> to you. > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>>> We decide not to provide common DDL options and let > > > > >> developers > > > > >> >> >> to > > > > >> >> >>>> define their own options as we do now per connector. > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> What is the reason for that? One of the main goals of > > > this > > > > >> FLIP > > > > >> >> >> was > > > > >> >> >>>> to > > > > >> >> >>>>>>>>> unify the configs, wasn't it? I understand that > current > > > > cache > > > > >> >> >>> design > > > > >> >> >>>>>>>>> doesn't depend on ConfigOptions, like was before. But > > > still > > > > >> we > > > > >> >> >> can > > > > >> >> >>>> put > > > > >> >> >>>>>>>>> these options into the framework, so connectors can > > reuse > > > > >> them > > > > >> >> >> and > > > > >> >> >>>>>>>>> avoid code duplication, and, what is more > significant, > > > > avoid > > > > >> >> >>> possible > > > > >> >> >>>>>>>>> different options naming. This moment can be pointed > > out > > > in > > > > >> >> >>>>>>>>> documentation for connector developers. > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> Best regards, > > > > >> >> >>>>>>>>> Alexander > > > > >> >> >>>>>>>>> > > > > >> >> >>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren < > > > > >> renqs...@gmail.com>: > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>> Hi Alexander, > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>> Thanks for the review and glad to see we are on the > > same > > > > >> page! > > > > >> >> I > > > > >> >> >>>> think you forgot to cc the dev mailing list so I’m also > > > quoting > > > > >> your > > > > >> >> >>> reply > > > > >> >> >>>> under this email. > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>>> We can add 'maxRetryTimes' option into this class > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>> In my opinion the retry logic should be implemented > in > > > > >> lookup() > > > > >> >> >>>> instead of in LookupFunction#eval(). Retrying is only > > > meaningful > > > > >> >> under > > > > >> >> >>> some > > > > >> >> >>>> specific retriable failures, and there might be custom > logic > > > > >> before > > > > >> >> >>> making > > > > >> >> >>>> retry, such as re-establish the connection > > > > >> (JdbcRowDataLookupFunction > > > > >> >> >> is > > > > >> >> >>> an > > > > >> >> >>>> example), so it's more handy to leave it to the connector. > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>>> I don't see DDL options, that were in previous > > version > > > of > > > > >> >> FLIP. > > > > >> >> >>> Do > > > > >> >> >>>> you have any special plans for them? > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>> We decide not to provide common DDL options and let > > > > >> developers > > > > >> >> >> to > > > > >> >> >>>> define their own options as we do now per connector. > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>> The rest of comments sound great and I’ll update the > > > FLIP. > > > > >> Hope > > > > >> >> >> we > > > > >> >> >>>> can finalize our proposal soon! > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>> Best, > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>> Qingsheng > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>> > > > > >> >> >>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов < > > > > >> >> >>> smirale...@gmail.com> > > > > >> >> >>>> wrote: > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> Hi Qingsheng and devs! > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> I like the overall design of updated FLIP, however > I > > > have > > > > >> >> >> several > > > > >> >> >>>>>>>>>>> suggestions and questions. > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> 1) Introducing LookupFunction as a subclass of > > > > >> TableFunction > > > > >> >> >> is a > > > > >> >> >>>> good > > > > >> >> >>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this > > > class. > > > > >> >> 'eval' > > > > >> >> >>>> method > > > > >> >> >>>>>>>>>>> of new LookupFunction is great for this purpose. > The > > > same > > > > >> is > > > > >> >> >> for > > > > >> >> >>>>>>>>>>> 'async' case. > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> 2) There might be other configs in future, such as > > > > >> >> >>>> 'cacheMissingKey' > > > > >> >> >>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in > > > > >> >> >>>> ScanRuntimeProvider. > > > > >> >> >>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider > > and > > > > >> >> >>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one > > > > 'build' > > > > >> >> >>> method > > > > >> >> >>>>>>>>>>> instead of many 'of' methods in future)? > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> 3) What are the plans for existing > > > TableFunctionProvider > > > > >> and > > > > >> >> >>>>>>>>>>> AsyncTableFunctionProvider? I think they should be > > > > >> deprecated. > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> 4) Am I right that the current design does not > assume > > > > >> usage of > > > > >> >> >>>>>>>>>>> user-provided LookupCache in re-scanning? In this > > case, > > > > it > > > > >> is > > > > >> >> >> not > > > > >> >> >>>> very > > > > >> >> >>>>>>>>>>> clear why do we need methods such as 'invalidate' > or > > > > >> 'putAll' > > > > >> >> >> in > > > > >> >> >>>>>>>>>>> LookupCache. > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> 5) I don't see DDL options, that were in previous > > > version > > > > >> of > > > > >> >> >>> FLIP. > > > > >> >> >>>> Do > > > > >> >> >>>>>>>>>>> you have any special plans for them? > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> If you don't mind, I would be glad to be able to > make > > > > small > > > > >> >> >>>>>>>>>>> adjustments to the FLIP document too. I think it's > > > worth > > > > >> >> >>> mentioning > > > > >> >> >>>>>>>>>>> about what exactly optimizations are planning in > the > > > > >> future. > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>> Smirnov Alexander > > > > >> >> >>>>>>>>>>> > > > > >> >> >>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren < > > > > >> renqs...@gmail.com > > > > >> >> >>> : > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Hi Alexander and devs, > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Thank you very much for the in-depth discussion! > As > > > Jark > > > > >> >> >>>> mentioned we were inspired by Alexander's idea and made a > > > > >> refactor on > > > > >> >> >> our > > > > >> >> >>>> design. FLIP-221 [1] has been updated to reflect our > design > > > now > > > > >> and > > > > >> >> we > > > > >> >> >>> are > > > > >> >> >>>> happy to hear more suggestions from you! > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Compared to the previous design: > > > > >> >> >>>>>>>>>>>> 1. The lookup cache serves at table runtime level > > and > > > is > > > > >> >> >>>> integrated as a component of LookupJoinRunner as discussed > > > > >> >> previously. > > > > >> >> >>>>>>>>>>>> 2. Interfaces are renamed and re-designed to > reflect > > > the > > > > >> new > > > > >> >> >>>> design. > > > > >> >> >>>>>>>>>>>> 3. We separate the all-caching case individually > and > > > > >> >> >> introduce a > > > > >> >> >>>> new RescanRuntimeProvider to reuse the ability of > scanning. > > We > > > > are > > > > >> >> >>> planning > > > > >> >> >>>> to support SourceFunction / InputFormat for now > considering > > > the > > > > >> >> >>> complexity > > > > >> >> >>>> of FLIP-27 Source API. > > > > >> >> >>>>>>>>>>>> 4. A new interface LookupFunction is introduced to > > > make > > > > >> the > > > > >> >> >>>> semantic of lookup more straightforward for developers. > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> For replying to Alexander: > > > > >> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat > > is > > > > >> >> >> deprecated > > > > >> >> >>>> or not. Am I right that it will be so in the future, but > > > > currently > > > > >> >> it's > > > > >> >> >>> not? > > > > >> >> >>>>>>>>>>>> Yes you are right. InputFormat is not deprecated > for > > > > now. > > > > >> I > > > > >> >> >>> think > > > > >> >> >>>> it will be deprecated in the future but we don't have a > > clear > > > > plan > > > > >> >> for > > > > >> >> >>> that. > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Thanks again for the discussion on this FLIP and > > > looking > > > > >> >> >> forward > > > > >> >> >>>> to cooperating with you after we finalize the design and > > > > >> interfaces! > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> [1] > > > > >> >> >>>> > > > > >> >> >>> > > > > >> >> >> > > > > >> >> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Qingsheng > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр > Смирнов < > > > > >> >> >>>> smirale...@gmail.com> wrote: > > > > >> >> >>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard! > > > > >> >> >>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>> Glad to see that we came to a consensus on almost > > all > > > > >> >> points! > > > > >> >> >>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat > > is > > > > >> >> >> deprecated > > > > >> >> >>>> or > > > > >> >> >>>>>>>>>>>>> not. Am I right that it will be so in the future, > > but > > > > >> >> >> currently > > > > >> >> >>>> it's > > > > >> >> >>>>>>>>>>>>> not? Actually I also think that for the first > > version > > > > >> it's > > > > >> >> OK > > > > >> >> >>> to > > > > >> >> >>>> use > > > > >> >> >>>>>>>>>>>>> InputFormat in ALL cache realization, because > > > > supporting > > > > >> >> >> rescan > > > > >> >> >>>>>>>>>>>>> ability seems like a very distant prospect. But > for > > > > this > > > > >> >> >>>> decision we > > > > >> >> >>>>>>>>>>>>> need a consensus among all discussion > participants. > > > > >> >> >>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>> In general, I don't have something to argue with > > your > > > > >> >> >>>> statements. All > > > > >> >> >>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it > > would > > > be > > > > >> nice > > > > >> >> >> to > > > > >> >> >>>> work > > > > >> >> >>>>>>>>>>>>> on this FLIP cooperatively. I've already done a > lot > > > of > > > > >> work > > > > >> >> >> on > > > > >> >> >>>> lookup > > > > >> >> >>>>>>>>>>>>> join caching with realization very close to the > one > > > we > > > > >> are > > > > >> >> >>>> discussing, > > > > >> >> >>>>>>>>>>>>> and want to share the results of this work. > Anyway > > > > >> looking > > > > >> >> >>>> forward for > > > > >> >> >>>>>>>>>>>>> the FLIP update! > > > > >> >> >>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>> Smirnov Alexander > > > > >> >> >>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu < > > > imj...@gmail.com > > > > >: > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> Hi Alex, > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> Thanks for summarizing your points. > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have > > > > >> discussed > > > > >> >> >> it > > > > >> >> >>>> several times > > > > >> >> >>>>>>>>>>>>>> and we have totally refactored the design. > > > > >> >> >>>>>>>>>>>>>> I'm glad to say we have reached a consensus on > > many > > > of > > > > >> your > > > > >> >> >>>> points! > > > > >> >> >>>>>>>>>>>>>> Qingsheng is still working on updating the > design > > > docs > > > > >> and > > > > >> >> >>>> maybe can be > > > > >> >> >>>>>>>>>>>>>> available in the next few days. > > > > >> >> >>>>>>>>>>>>>> I will share some conclusions from our > > discussions: > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> 1) we have refactored the design towards to > "cache > > > in > > > > >> >> >>>> framework" way. > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> 2) a "LookupCache" interface for users to > > customize > > > > and > > > > >> a > > > > >> >> >>>> default > > > > >> >> >>>>>>>>>>>>>> implementation with builder for users to > easy-use. > > > > >> >> >>>>>>>>>>>>>> This can both make it possible to both have > > > > flexibility > > > > >> and > > > > >> >> >>>> conciseness. > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU > > > lookup > > > > >> >> >> cache, > > > > >> >> >>>> esp reducing > > > > >> >> >>>>>>>>>>>>>> IO. > > > > >> >> >>>>>>>>>>>>>> Filter pushdown should be the final state and > the > > > > >> unified > > > > >> >> >> way > > > > >> >> >>>> to both > > > > >> >> >>>>>>>>>>>>>> support pruning ALL cache and LRU cache, > > > > >> >> >>>>>>>>>>>>>> so I think we should make effort in this > > direction. > > > If > > > > >> we > > > > >> >> >> need > > > > >> >> >>>> to support > > > > >> >> >>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not > use > > > > >> >> >>>>>>>>>>>>>> it for LRU cache as well? Either way, as we > decide > > > to > > > > >> >> >>> implement > > > > >> >> >>>> the cache > > > > >> >> >>>>>>>>>>>>>> in the framework, we have the chance to support > > > > >> >> >>>>>>>>>>>>>> filter on cache anytime. This is an optimization > > and > > > > it > > > > >> >> >>> doesn't > > > > >> >> >>>> affect the > > > > >> >> >>>>>>>>>>>>>> public API. I think we can create a JIRA issue > to > > > > >> >> >>>>>>>>>>>>>> discuss it when the FLIP is accepted. > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to > > your > > > > >> >> >> proposal. > > > > >> >> >>>>>>>>>>>>>> In the first version, we will only support > > > > InputFormat, > > > > >> >> >>>> SourceFunction for > > > > >> >> >>>>>>>>>>>>>> cache all (invoke InputFormat in join operator). > > > > >> >> >>>>>>>>>>>>>> For FLIP-27 source, we need to join a true > source > > > > >> operator > > > > >> >> >>>> instead of > > > > >> >> >>>>>>>>>>>>>> calling it embedded in the join operator. > > > > >> >> >>>>>>>>>>>>>> However, this needs another FLIP to support the > > > > re-scan > > > > >> >> >>> ability > > > > >> >> >>>> for FLIP-27 > > > > >> >> >>>>>>>>>>>>>> Source, and this can be a large work. > > > > >> >> >>>>>>>>>>>>>> In order to not block this issue, we can put the > > > > effort > > > > >> of > > > > >> >> >>>> FLIP-27 source > > > > >> >> >>>>>>>>>>>>>> integration into future work and integrate > > > > >> >> >>>>>>>>>>>>>> InputFormat&SourceFunction for now. > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> I think it's fine to use > > InputFormat&SourceFunction, > > > > as > > > > >> >> they > > > > >> >> >>>> are not > > > > >> >> >>>>>>>>>>>>>> deprecated, otherwise, we have to introduce > > another > > > > >> >> function > > > > >> >> >>>>>>>>>>>>>> similar to them which is meaningless. We need to > > > plan > > > > >> >> >> FLIP-27 > > > > >> >> >>>> source > > > > >> >> >>>>>>>>>>>>>> integration ASAP before InputFormat & > > SourceFunction > > > > are > > > > >> >> >>>> deprecated. > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> Best, > > > > >> >> >>>>>>>>>>>>>> Jark > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов > < > > > > >> >> >>>> smirale...@gmail.com> > > > > >> >> >>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> Hi Martijn! > > > > >> >> >>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> Got it. Therefore, the realization with > > InputFormat > > > > is > > > > >> not > > > > >> >> >>>> considered. > > > > >> >> >>>>>>>>>>>>>>> Thanks for clearing that up! > > > > >> >> >>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>> Smirnov Alexander > > > > >> >> >>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser < > > > > >> >> >>>> mart...@ververica.com>: > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>> Hi, > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>> With regards to: > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> But if there are plans to refactor all > > connectors > > > > to > > > > >> >> >>> FLIP-27 > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. > > The > > > > old > > > > >> >> >>>> interfaces will be > > > > >> >> >>>>>>>>>>>>>>>> deprecated and connectors will either be > > > refactored > > > > to > > > > >> >> use > > > > >> >> >>>> the new ones > > > > >> >> >>>>>>>>>>>>>>> or > > > > >> >> >>>>>>>>>>>>>>>> dropped. > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>> The caching should work for connectors that > are > > > > using > > > > >> >> >>> FLIP-27 > > > > >> >> >>>> interfaces, > > > > >> >> >>>>>>>>>>>>>>>> we should not introduce new features for old > > > > >> interfaces. > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>> Martijn > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр > Смирнов > > < > > > > >> >> >>>> smirale...@gmail.com> > > > > >> >> >>>>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> Hi Jark! > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> Sorry for the late response. I would like to > > make > > > > >> some > > > > >> >> >>>> comments and > > > > >> >> >>>>>>>>>>>>>>>>> clarify my points. > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think > > we > > > > can > > > > >> >> >>> achieve > > > > >> >> >>>> both > > > > >> >> >>>>>>>>>>>>>>>>> advantages this way: put the Cache interface > in > > > > >> >> >>>> flink-table-common, > > > > >> >> >>>>>>>>>>>>>>>>> but have implementations of it in > > > > >> flink-table-runtime. > > > > >> >> >>>> Therefore if a > > > > >> >> >>>>>>>>>>>>>>>>> connector developer wants to use existing > cache > > > > >> >> >> strategies > > > > >> >> >>>> and their > > > > >> >> >>>>>>>>>>>>>>>>> implementations, he can just pass > lookupConfig > > to > > > > the > > > > >> >> >>>> planner, but if > > > > >> >> >>>>>>>>>>>>>>>>> he wants to have its own cache implementation > > in > > > > his > > > > >> >> >>>> TableFunction, it > > > > >> >> >>>>>>>>>>>>>>>>> will be possible for him to use the existing > > > > >> interface > > > > >> >> >> for > > > > >> >> >>>> this > > > > >> >> >>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in > > the > > > > >> >> >>>> documentation). In > > > > >> >> >>>>>>>>>>>>>>>>> this way all configs and metrics will be > > unified. > > > > >> WDYT? > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the > > cache, > > > we > > > > >> will > > > > >> >> >>>> have 90% of > > > > >> >> >>>>>>>>>>>>>>>>> lookup requests that can never be cached > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters > > optimization > > > in > > > > >> case > > > > >> >> >> of > > > > >> >> >>>> LRU cache. > > > > >> >> >>>>>>>>>>>>>>>>> It looks like Cache<RowData, > > > Collection<RowData>>. > > > > >> Here > > > > >> >> >> we > > > > >> >> >>>> always > > > > >> >> >>>>>>>>>>>>>>>>> store the response of the dimension table in > > > cache, > > > > >> even > > > > >> >> >>>> after > > > > >> >> >>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no > > rows > > > > >> after > > > > >> >> >>>> applying > > > > >> >> >>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of > > > > >> >> >>> TableFunction, > > > > >> >> >>>> we store > > > > >> >> >>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the > > > cache > > > > >> line > > > > >> >> >>> will > > > > >> >> >>>> be > > > > >> >> >>>>>>>>>>>>>>>>> filled, but will require much less memory (in > > > > bytes). > > > > >> >> >> I.e. > > > > >> >> >>>> we don't > > > > >> >> >>>>>>>>>>>>>>>>> completely filter keys, by which result was > > > pruned, > > > > >> but > > > > >> >> >>>> significantly > > > > >> >> >>>>>>>>>>>>>>>>> reduce required memory to store this result. > If > > > the > > > > >> user > > > > >> >> >>>> knows about > > > > >> >> >>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' > > > > option > > > > >> >> >> before > > > > >> >> >>>> the start > > > > >> >> >>>>>>>>>>>>>>>>> of the job. But actually I came up with the > > idea > > > > >> that we > > > > >> >> >>> can > > > > >> >> >>>> do this > > > > >> >> >>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' > and > > > > >> 'weigher' > > > > >> >> >>>> methods of > > > > >> >> >>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the > > > > >> collection > > > > >> >> >> of > > > > >> >> >>>> rows > > > > >> >> >>>>>>>>>>>>>>>>> (value of cache). Therefore cache can > > > automatically > > > > >> fit > > > > >> >> >>> much > > > > >> >> >>>> more > > > > >> >> >>>>>>>>>>>>>>>>> records than before. > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do > > > > filters > > > > >> and > > > > >> >> >>>> projects > > > > >> >> >>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > > > > >> >> >>>> SupportsProjectionPushDown. > > > > >> >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the > > > > interfaces, > > > > >> >> >> don't > > > > >> >> >>>> mean it's > > > > >> >> >>>>>>>>>>>>>>> hard > > > > >> >> >>>>>>>>>>>>>>>>> to implement. > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> It's debatable how difficult it will be to > > > > implement > > > > >> >> >> filter > > > > >> >> >>>> pushdown. > > > > >> >> >>>>>>>>>>>>>>>>> But I think the fact that currently there is > no > > > > >> database > > > > >> >> >>>> connector > > > > >> >> >>>>>>>>>>>>>>>>> with filter pushdown at least means that this > > > > feature > > > > >> >> >> won't > > > > >> >> >>>> be > > > > >> >> >>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we > > > talk > > > > >> about > > > > >> >> >>>> other > > > > >> >> >>>>>>>>>>>>>>>>> connectors (not in Flink repo), their > databases > > > > might > > > > >> >> not > > > > >> >> >>>> support all > > > > >> >> >>>>>>>>>>>>>>>>> Flink filters (or not support filters at > all). > > I > > > > >> think > > > > >> >> >>> users > > > > >> >> >>>> are > > > > >> >> >>>>>>>>>>>>>>>>> interested in supporting cache filters > > > optimization > > > > >> >> >>>> independently of > > > > >> >> >>>>>>>>>>>>>>>>> supporting other features and solving more > > > complex > > > > >> >> >> problems > > > > >> >> >>>> (or > > > > >> >> >>>>>>>>>>>>>>>>> unsolvable at all). > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> 3) I agree with your third statement. > Actually > > in > > > > our > > > > >> >> >>>> internal version > > > > >> >> >>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning > and > > > > >> >> reloading > > > > >> >> >>>> data from > > > > >> >> >>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find > a > > > way > > > > to > > > > >> >> >> unify > > > > >> >> >>>> the logic > > > > >> >> >>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, > > > > >> >> SourceFunction, > > > > >> >> >>>> Source,...) > > > > >> >> >>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a > > result > > > I > > > > >> >> >> settled > > > > >> >> >>>> on using > > > > >> >> >>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning > > in > > > > all > > > > >> >> >> lookup > > > > >> >> >>>>>>>>>>>>>>>>> connectors. (I didn't know that there are > plans > > > to > > > > >> >> >>> deprecate > > > > >> >> >>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO > > > usage > > > > of > > > > >> >> >>>> FLIP-27 source > > > > >> >> >>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this > > > > source > > > > >> was > > > > >> >> >>>> designed to > > > > >> >> >>>>>>>>>>>>>>>>> work in distributed environment > > (SplitEnumerator > > > on > > > > >> >> >>>> JobManager and > > > > >> >> >>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one > > > operator > > > > >> >> >> (lookup > > > > >> >> >>>> join > > > > >> >> >>>>>>>>>>>>>>>>> operator in our case). There is even no > direct > > > way > > > > to > > > > >> >> >> pass > > > > >> >> >>>> splits from > > > > >> >> >>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic > > works > > > > >> >> through > > > > >> >> >>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires > > > > >> >> >>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send > > > > >> >> >> AddSplitEvents). > > > > >> >> >>>> Usage of > > > > >> >> >>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more > > clearer > > > > and > > > > >> >> >>>> easier. But if > > > > >> >> >>>>>>>>>>>>>>>>> there are plans to refactor all connectors to > > > > >> FLIP-27, I > > > > >> >> >>>> have the > > > > >> >> >>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from > > lookup > > > > join > > > > >> >> ALL > > > > >> >> >>>> cache in > > > > >> >> >>>>>>>>>>>>>>>>> favor of simple join with multiple scanning > of > > > > batch > > > > >> >> >>> source? > > > > >> >> >>>> The point > > > > >> >> >>>>>>>>>>>>>>>>> is that the only difference between lookup > join > > > ALL > > > > >> >> cache > > > > >> >> >>>> and simple > > > > >> >> >>>>>>>>>>>>>>>>> join with batch source is that in the first > > case > > > > >> >> scanning > > > > >> >> >>> is > > > > >> >> >>>> performed > > > > >> >> >>>>>>>>>>>>>>>>> multiple times, in between which state > (cache) > > is > > > > >> >> cleared > > > > >> >> >>>> (correct me > > > > >> >> >>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the > > > > >> functionality of > > > > >> >> >>>> simple join > > > > >> >> >>>>>>>>>>>>>>>>> to support state reloading + extend the > > > > >> functionality of > > > > >> >> >>>> scanning > > > > >> >> >>>>>>>>>>>>>>>>> batch source multiple times (this one should > be > > > > easy > > > > >> >> with > > > > >> >> >>>> new FLIP-27 > > > > >> >> >>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading > - > > we > > > > >> will > > > > >> >> >> need > > > > >> >> >>>> to change > > > > >> >> >>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits > > > again > > > > >> after > > > > >> >> >>>> some TTL). > > > > >> >> >>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a > > long-term > > > > >> goal > > > > >> >> >> and > > > > >> >> >>>> will make > > > > >> >> >>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you > > said. > > > > >> Maybe > > > > >> >> >> we > > > > >> >> >>>> can limit > > > > >> >> >>>>>>>>>>>>>>>>> ourselves to a simpler solution now > > > (InputFormats). > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> So to sum up, my points is like this: > > > > >> >> >>>>>>>>>>>>>>>>> 1) There is a way to make both concise and > > > flexible > > > > >> >> >>>> interfaces for > > > > >> >> >>>>>>>>>>>>>>>>> caching in lookup join. > > > > >> >> >>>>>>>>>>>>>>>>> 2) Cache filters optimization is important > both > > > in > > > > >> LRU > > > > >> >> >> and > > > > >> >> >>>> ALL caches. > > > > >> >> >>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be > > > > >> supported > > > > >> >> >> in > > > > >> >> >>>> Flink > > > > >> >> >>>>>>>>>>>>>>>>> connectors, some of the connectors might not > > have > > > > the > > > > >> >> >>>> opportunity to > > > > >> >> >>>>>>>>>>>>>>>>> support filter pushdown + as I know, > currently > > > > filter > > > > >> >> >>>> pushdown works > > > > >> >> >>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache > > filters > > > + > > > > >> >> >>>> projections > > > > >> >> >>>>>>>>>>>>>>>>> optimization should be independent from other > > > > >> features. > > > > >> >> >>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic > > that > > > > >> >> involves > > > > >> >> >>>> multiple > > > > >> >> >>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing > > from > > > > >> >> >>>> InputFormat in favor > > > > >> >> >>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache > > realization > > > > >> really > > > > >> >> >>>> complex and > > > > >> >> >>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can > > extend > > > > the > > > > >> >> >>>> functionality of > > > > >> >> >>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in > > > case > > > > of > > > > >> >> >>> lookup > > > > >> >> >>>> join ALL > > > > >> >> >>>>>>>>>>>>>>>>> cache? > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>>> Smirnov Alexander > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> [1] > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> > > > > >> >> >>>> > > > > >> >> >>> > > > > >> >> >> > > > > >> >> > > > > >> > > > > > > > > > > https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu < > > > > imj...@gmail.com > > > > >> >: > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> It's great to see the active discussion! I > > want > > > to > > > > >> >> share > > > > >> >> >>> my > > > > >> >> >>>> ideas: > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. > > > connectors > > > > >> base > > > > >> >> >>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both > > ways > > > > >> should > > > > >> >> >>>> work (e.g., > > > > >> >> >>>>>>>>>>>>>>> cache > > > > >> >> >>>>>>>>>>>>>>>>>> pruning, compatibility). > > > > >> >> >>>>>>>>>>>>>>>>>> The framework way can provide more concise > > > > >> interfaces. > > > > >> >> >>>>>>>>>>>>>>>>>> The connector base way can define more > > flexible > > > > >> cache > > > > >> >> >>>>>>>>>>>>>>>>>> strategies/implementations. > > > > >> >> >>>>>>>>>>>>>>>>>> We are still investigating a way to see if > we > > > can > > > > >> have > > > > >> >> >>> both > > > > >> >> >>>>>>>>>>>>>>> advantages. > > > > >> >> >>>>>>>>>>>>>>>>>> We should reach a consensus that the way > > should > > > > be a > > > > >> >> >> final > > > > >> >> >>>> state, > > > > >> >> >>>>>>>>>>>>>>> and we > > > > >> >> >>>>>>>>>>>>>>>>>> are on the path to it. > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> 2) filters and projections pushdown: > > > > >> >> >>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown > > into > > > > >> cache > > > > >> >> >> can > > > > >> >> >>>> benefit a > > > > >> >> >>>>>>>>>>>>>>> lot > > > > >> >> >>>>>>>>>>>>>>>>> for > > > > >> >> >>>>>>>>>>>>>>>>>> ALL cache. > > > > >> >> >>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. > > > > Connectors > > > > >> use > > > > >> >> >>>> cache to > > > > >> >> >>>>>>>>>>>>>>> reduce > > > > >> >> >>>>>>>>>>>>>>>>> IO > > > > >> >> >>>>>>>>>>>>>>>>>> requests to databases for better throughput. > > > > >> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the > > cache, > > > we > > > > >> will > > > > >> >> >>>> have 90% of > > > > >> >> >>>>>>>>>>>>>>>>> lookup > > > > >> >> >>>>>>>>>>>>>>>>>> requests that can never be cached > > > > >> >> >>>>>>>>>>>>>>>>>> and hit directly to the databases. That > means > > > the > > > > >> cache > > > > >> >> >> is > > > > >> >> >>>>>>>>>>>>>>> meaningless in > > > > >> >> >>>>>>>>>>>>>>>>>> this case. > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way > to > > do > > > > >> >> filters > > > > >> >> >>>> and projects > > > > >> >> >>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > > > > >> >> >>>>>>>>>>>>>>> SupportsProjectionPushDown. > > > > >> >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the > > > > interfaces, > > > > >> >> >> don't > > > > >> >> >>>> mean it's > > > > >> >> >>>>>>>>>>>>>>> hard > > > > >> >> >>>>>>>>>>>>>>>>> to > > > > >> >> >>>>>>>>>>>>>>>>>> implement. > > > > >> >> >>>>>>>>>>>>>>>>>> They should implement the pushdown > interfaces > > to > > > > >> reduce > > > > >> >> >> IO > > > > >> >> >>>> and the > > > > >> >> >>>>>>>>>>>>>>> cache > > > > >> >> >>>>>>>>>>>>>>>>>> size. > > > > >> >> >>>>>>>>>>>>>>>>>> That should be a final state that the scan > > > source > > > > >> and > > > > >> >> >>>> lookup source > > > > >> >> >>>>>>>>>>>>>>> share > > > > >> >> >>>>>>>>>>>>>>>>>> the exact pushdown implementation. > > > > >> >> >>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the > > > pushdown > > > > >> logic > > > > >> >> >> in > > > > >> >> >>>> caches, > > > > >> >> >>>>>>>>>>>>>>> which > > > > >> >> >>>>>>>>>>>>>>>>>> will complex the lookup join design. > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> 3) ALL cache abstraction > > > > >> >> >>>>>>>>>>>>>>>>>> All cache might be the most challenging part > > of > > > > this > > > > >> >> >> FLIP. > > > > >> >> >>>> We have > > > > >> >> >>>>>>>>>>>>>>> never > > > > >> >> >>>>>>>>>>>>>>>>>> provided a reload-lookup public interface. > > > > >> >> >>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the > > "eval" > > > > >> method > > > > >> >> >> of > > > > >> >> >>>>>>>>>>>>>>> TableFunction. > > > > >> >> >>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive). > > > > >> >> >>>>>>>>>>>>>>>>>> Ideally, connector implementation should > share > > > the > > > > >> >> logic > > > > >> >> >>> of > > > > >> >> >>>> reload > > > > >> >> >>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with > > > > >> >> >>>> InputFormat/SourceFunction/FLIP-27 > > > > >> >> >>>>>>>>>>>>>>>>> Source. > > > > >> >> >>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are > > > > deprecated, > > > > >> and > > > > >> >> >>> the > > > > >> >> >>>> FLIP-27 > > > > >> >> >>>>>>>>>>>>>>>>> source > > > > >> >> >>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator. > > > > >> >> >>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in > > > > >> LookupJoin, > > > > >> >> >>> this > > > > >> >> >>>> may make > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>> scope of this FLIP much larger. > > > > >> >> >>>>>>>>>>>>>>>>>> We are still investigating how to abstract > the > > > ALL > > > > >> >> cache > > > > >> >> >>>> logic and > > > > >> >> >>>>>>>>>>>>>>> reuse > > > > >> >> >>>>>>>>>>>>>>>>>> the existing source interfaces. > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> Best, > > > > >> >> >>>>>>>>>>>>>>>>>> Jark > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko < > > > > >> >> >>>> ro.v.bo...@gmail.com> > > > > >> >> >>>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>> It's a much more complicated activity and > > lies > > > > out > > > > >> of > > > > >> >> >> the > > > > >> >> >>>> scope of > > > > >> >> >>>>>>>>>>>>>>> this > > > > >> >> >>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should > be > > > > done > > > > >> for > > > > >> >> >>> all > > > > >> >> >>>>>>>>>>>>>>>>> ScanTableSource > > > > >> >> >>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones). > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn > Visser < > > > > >> >> >>>>>>>>>>>>>>> martijnvis...@apache.org> > > > > >> >> >>>>>>>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> Hi everyone, > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander > > > correctly > > > > >> >> >>> mentioned > > > > >> >> >>>> that > > > > >> >> >>>>>>>>>>>>>>> filter > > > > >> >> >>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for > > > > >> >> >> jdbc/hive/hbase." > > > > >> >> >>>> -> Would > > > > >> >> >>>>>>>>>>>>>>> an > > > > >> >> >>>>>>>>>>>>>>>>>>>> alternative solution be to actually > > implement > > > > >> these > > > > >> >> >>> filter > > > > >> >> >>>>>>>>>>>>>>> pushdowns? > > > > >> >> >>>>>>>>>>>>>>>>> I > > > > >> >> >>>>>>>>>>>>>>>>>>>> can > > > > >> >> >>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits > to > > > > doing > > > > >> >> >> that, > > > > >> >> >>>> outside > > > > >> >> >>>>>>>>>>>>>>> of > > > > >> >> >>>>>>>>>>>>>>>>> lookup > > > > >> >> >>>>>>>>>>>>>>>>>>>> caching and metrics. > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> Martijn Visser > > > > >> >> >>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82 > > > > >> >> >>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < > > > > >> >> >>>> ro.v.bo...@gmail.com> > > > > >> >> >>>>>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Hi everyone! > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable > > > improvement! > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> I do think that single cache > implementation > > > > >> would be > > > > >> >> >> a > > > > >> >> >>>> nice > > > > >> >> >>>>>>>>>>>>>>>>> opportunity > > > > >> >> >>>>>>>>>>>>>>>>>>>> for > > > > >> >> >>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR > > SYSTEM_TIME > > > > AS > > > > >> OF > > > > >> >> >>>> proc_time" > > > > >> >> >>>>>>>>>>>>>>>>> semantics > > > > >> >> >>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be > > > > >> implemented. > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can > > say > > > > >> that: > > > > >> >> >>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity > > to > > > > cut > > > > >> off > > > > >> >> >>> the > > > > >> >> >>>> cache > > > > >> >> >>>>>>>>>>>>>>> size > > > > >> >> >>>>>>>>>>>>>>>>> by > > > > >> >> >>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And > the > > > most > > > > >> >> handy > > > > >> >> >>>> way to do > > > > >> >> >>>>>>>>>>>>>>> it > > > > >> >> >>>>>>>>>>>>>>>>> is > > > > >> >> >>>>>>>>>>>>>>>>>>>> apply > > > > >> >> >>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a > bit > > > > >> harder to > > > > >> >> >>>> pass it > > > > >> >> >>>>>>>>>>>>>>>>> through the > > > > >> >> >>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And > > > Alexander > > > > >> >> >>> correctly > > > > >> >> >>>>>>>>>>>>>>> mentioned > > > > >> >> >>>>>>>>>>>>>>>>> that > > > > >> >> >>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented > > for > > > > >> >> >>>> jdbc/hive/hbase. > > > > >> >> >>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different > caching > > > > >> >> >> parameters > > > > >> >> >>>> for > > > > >> >> >>>>>>>>>>>>>>> different > > > > >> >> >>>>>>>>>>>>>>>>>>>> tables > > > > >> >> >>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to > > set > > > it > > > > >> >> >> through > > > > >> >> >>>> DDL > > > > >> >> >>>>>>>>>>>>>>> rather > > > > >> >> >>>>>>>>>>>>>>>>> than > > > > >> >> >>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other > > options > > > > for > > > > >> >> all > > > > >> >> >>>> lookup > > > > >> >> >>>>>>>>>>>>>>> tables. > > > > >> >> >>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework > > > > really > > > > >> >> >>>> deprives us of > > > > >> >> >>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to > > > implement > > > > >> >> their > > > > >> >> >>> own > > > > >> >> >>>>>>>>>>>>>>> cache). > > > > >> >> >>>>>>>>>>>>>>>>> But > > > > >> >> >>>>>>>>>>>>>>>>>>>> most > > > > >> >> >>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating > > more > > > > >> >> >> different > > > > >> >> >>>> cache > > > > >> >> >>>>>>>>>>>>>>>>> strategies > > > > >> >> >>>>>>>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>>>>>> a wider set of configurations. > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> All these points are much closer to the > > > schema > > > > >> >> >> proposed > > > > >> >> >>>> by > > > > >> >> >>>>>>>>>>>>>>>>> Alexander. > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm > not > > > > right > > > > >> and > > > > >> >> >>> all > > > > >> >> >>>> these > > > > >> >> >>>>>>>>>>>>>>>>>>>> facilities > > > > >> >> >>>>>>>>>>>>>>>>>>>>> might be simply implemented in your > > > > architecture? > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Roman Boyko > > > > >> >> >>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn > > Visser < > > > > >> >> >>>>>>>>>>>>>>>>> martijnvis...@apache.org> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hi everyone, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just > > > wanted > > > > to > > > > >> >> >>>> express that > > > > >> >> >>>>>>>>>>>>>>> I > > > > >> >> >>>>>>>>>>>>>>>>> really > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on > this > > > > topic > > > > >> >> >> and I > > > > >> >> >>>> hope > > > > >> >> >>>>>>>>>>>>>>> that > > > > >> >> >>>>>>>>>>>>>>>>>>>> others > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> will join the conversation. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Martijn > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр > > > > Смирнов < > > > > >> >> >>>>>>>>>>>>>>>>> smirale...@gmail.com> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! > > > However, I > > > > >> have > > > > >> >> >>>> questions > > > > >> >> >>>>>>>>>>>>>>>>> about > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't > > get > > > > >> >> >>>> something?). > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic > of > > > "FOR > > > > >> >> >>>> SYSTEM_TIME > > > > >> >> >>>>>>>>>>>>>>> AS OF > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> proc_time” > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR > > > > SYSTEM_TIME > > > > >> AS > > > > >> >> >> OF > > > > >> >> >>>>>>>>>>>>>>> proc_time" > > > > >> >> >>>>>>>>>>>>>>>>> is > > > > >> >> >>>>>>>>>>>>>>>>>>>> not > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as > > you > > > > >> said, > > > > >> >> >>> users > > > > >> >> >>>> go > > > > >> >> >>>>>>>>>>>>>>> on it > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better > performance > > > (no > > > > >> one > > > > >> >> >>>> proposed > > > > >> >> >>>>>>>>>>>>>>> to > > > > >> >> >>>>>>>>>>>>>>>>> enable > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users > do > > > you > > > > >> mean > > > > >> >> >>>> other > > > > >> >> >>>>>>>>>>>>>>>>> developers > > > > >> >> >>>>>>>>>>>>>>>>>>>> of > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers > > > > explicitly > > > > >> >> >>> specify > > > > >> >> >>>>>>>>>>>>>>> whether > > > > >> >> >>>>>>>>>>>>>>>>> their > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in > the > > > > list > > > > >> of > > > > >> >> >>>> supported > > > > >> >> >>>>>>>>>>>>>>>>>>>> options), > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't > > > want > > > > >> to. > > > > >> >> So > > > > >> >> >>>> what > > > > >> >> >>>>>>>>>>>>>>>>> exactly is > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the difference between implementing > > caching > > > > in > > > > >> >> >>> modules > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in > > > flink-table-common > > > > >> from > > > > >> >> >>> the > > > > >> >> >>>>>>>>>>>>>>>>> considered > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on > > > > >> >> >>>> breaking/non-breaking > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF > > > > proc_time"? > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table > > > > >> options in > > > > >> >> >>> DDL > > > > >> >> >>>> to > > > > >> >> >>>>>>>>>>>>>>>>> control > > > > >> >> >>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has > > never > > > > >> >> happened > > > > >> >> >>>>>>>>>>>>>>> previously > > > > >> >> >>>>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>>>>>> should > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> be cautious > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of > > > > semantics > > > > >> of > > > > >> >> >> DDL > > > > >> >> >>>>>>>>>>>>>>> options > > > > >> >> >>>>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't > > it > > > > >> about > > > > >> >> >>>> limiting > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>> scope > > > > >> >> >>>>>>>>>>>>>>>>>>>> of > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user > > > > business > > > > >> >> >> logic > > > > >> >> >>>> rather > > > > >> >> >>>>>>>>>>>>>>> than > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding > logic > > in > > > > the > > > > >> >> >>>> framework? I > > > > >> >> >>>>>>>>>>>>>>>>> mean > > > > >> >> >>>>>>>>>>>>>>>>>>>> that > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an > > > option > > > > >> with > > > > >> >> >>>> lookup > > > > >> >> >>>>>>>>>>>>>>> cache > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would be > the > > > > wrong > > > > >> >> >>>> decision, > > > > >> >> >>>>>>>>>>>>>>>>> because it > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business > > logic > > > > (not > > > > >> >> >> just > > > > >> >> >>>>>>>>>>>>>>> performance > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several > > > > functions > > > > >> of > > > > >> >> >> ONE > > > > >> >> >>>> table > > > > >> >> >>>>>>>>>>>>>>>>> (there > > > > >> >> >>>>>>>>>>>>>>>>>>>> can > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different > > caches). > > > > >> Does it > > > > >> >> >>>> really > > > > >> >> >>>>>>>>>>>>>>>>> matter for > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the > > logic > > > is > > > > >> >> >>> located, > > > > >> >> >>>>>>>>>>>>>>> which is > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> affected by the applied option? > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option > > > > >> 'sink.parallelism', > > > > >> >> >>>> which in > > > > >> >> >>>>>>>>>>>>>>>>> some way > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the > framework" > > > and > > > > I > > > > >> >> >> don't > > > > >> >> >>>> see any > > > > >> >> >>>>>>>>>>>>>>>>> problem > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> here. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this > > > > all-caching > > > > >> >> >>>> scenario > > > > >> >> >>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>> design > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> would become more complex > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate > > > discussion, > > > > >> but > > > > >> >> >>>> actually > > > > >> >> >>>>>>>>>>>>>>> in our > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem > > > quite > > > > >> >> >> easily > > > > >> >> >>> - > > > > >> >> >>>> we > > > > >> >> >>>>>>>>>>>>>>> reused > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need > > for > > > a > > > > >> new > > > > >> >> >>> API). > > > > >> >> >>>> The > > > > >> >> >>>>>>>>>>>>>>>>> point is > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors > use > > > > >> >> >> InputFormat > > > > >> >> >>>> for > > > > >> >> >>>>>>>>>>>>>>>>> scanning > > > > >> >> >>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and > even > > > Hive > > > > >> - it > > > > >> >> >>> uses > > > > >> >> >>>>>>>>>>>>>>> class > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just > a > > > > >> wrapper > > > > >> >> >>> around > > > > >> >> >>>>>>>>>>>>>>>>> InputFormat. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the > > > ability > > > > >> to > > > > >> >> >>> reload > > > > >> >> >>>>>>>>>>>>>>> cache > > > > >> >> >>>>>>>>>>>>>>>>> data > > > > >> >> >>>>>>>>>>>>>>>>>>>> in > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on > > > number > > > > >> of > > > > >> >> >>>>>>>>>>>>>>> InputSplits, > > > > >> >> >>>>>>>>>>>>>>>>> but > > > > >> >> >>>>>>>>>>>>>>>>>>>> has > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache > reload > > > > time > > > > >> >> >>>> significantly > > > > >> >> >>>>>>>>>>>>>>>>> reduces > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream > > > blocking). I > > > > >> know > > > > >> >> >>> that > > > > >> >> >>>>>>>>>>>>>>> usually > > > > >> >> >>>>>>>>>>>>>>>>> we > > > > >> >> >>>>>>>>>>>>>>>>>>>> try > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink > > > code, > > > > >> but > > > > >> >> >>> maybe > > > > >> >> >>>> this > > > > >> >> >>>>>>>>>>>>>>> one > > > > >> >> >>>>>>>>>>>>>>>>> can > > > > >> >> >>>>>>>>>>>>>>>>>>>> be > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's > > an > > > > >> ideal > > > > >> >> >>>> solution, > > > > >> >> >>>>>>>>>>>>>>> maybe > > > > >> >> >>>>>>>>>>>>>>>>>>>> there > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> are better ones. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework > > might > > > > >> >> >> introduce > > > > >> >> >>>>>>>>>>>>>>>>> compatibility > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> issues > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the > > > > developer > > > > >> of > > > > >> >> >> the > > > > >> >> >>>>>>>>>>>>>>> connector > > > > >> >> >>>>>>>>>>>>>>>>>>>> won't > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use > > new > > > > >> cache > > > > >> >> >>>> options > > > > >> >> >>>>>>>>>>>>>>>>>>>> incorrectly > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same > options > > > > into > > > > >> 2 > > > > >> >> >>>> different > > > > >> >> >>>>>>>>>>>>>>> code > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he > will > > > > need > > > > >> to > > > > >> >> >> do > > > > >> >> >>>> is to > > > > >> >> >>>>>>>>>>>>>>>>> redirect > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's > > > > >> LookupConfig > > > > >> >> (+ > > > > >> >> >>>> maybe > > > > >> >> >>>>>>>>>>>>>>> add an > > > > >> >> >>>>>>>>>>>>>>>>>>>> alias > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> for options, if there was different > > > naming), > > > > >> >> >>> everything > > > > >> >> >>>>>>>>>>>>>>> will be > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer > > > won't > > > > >> do > > > > >> >> >>>>>>>>>>>>>>> refactoring at > > > > >> >> >>>>>>>>>>>>>>>>> all, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the > connector > > > > >> because > > > > >> >> >> of > > > > >> >> >>>>>>>>>>>>>>> backward > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer > wants > > to > > > > use > > > > >> >> his > > > > >> >> >>> own > > > > >> >> >>>>>>>>>>>>>>> cache > > > > >> >> >>>>>>>>>>>>>>>>> logic, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the > > > > configs > > > > >> >> into > > > > >> >> >>> the > > > > >> >> >>>>>>>>>>>>>>>>> framework, > > > > >> >> >>>>>>>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation > with > > > > >> already > > > > >> >> >>>> existing > > > > >> >> >>>>>>>>>>>>>>>>> configs > > > > >> >> >>>>>>>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that > it's a > > > > rare > > > > >> >> >> case). > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be > pushed > > > all > > > > >> the > > > > >> >> >> way > > > > >> >> >>>> down > > > > >> >> >>>>>>>>>>>>>>> to > > > > >> >> >>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>> table > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan > > > source > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth > is > > > that > > > > >> the > > > > >> >> >>> ONLY > > > > >> >> >>>>>>>>>>>>>>> connector > > > > >> >> >>>>>>>>>>>>>>>>>>>> that > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is > > > > >> FileSystemTableSource > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it > > > > currently). > > > > >> >> Also > > > > >> >> >>>> for some > > > > >> >> >>>>>>>>>>>>>>>>>>>> databases > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such > > > > complex > > > > >> >> >>> filters > > > > >> >> >>>>>>>>>>>>>>> that we > > > > >> >> >>>>>>>>>>>>>>>>> have > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> in Flink. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to > the > > > > cache > > > > >> >> >> seems > > > > >> >> >>>> not > > > > >> >> >>>>>>>>>>>>>>>>> quite > > > > >> >> >>>>>>>>>>>>>>>>>>>>> useful > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily > large > > > > >> amount of > > > > >> >> >>> data > > > > >> >> >>>>>>>>>>>>>>> from the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, > > > > suppose > > > > >> in > > > > >> >> >>>> dimension > > > > >> >> >>>>>>>>>>>>>>>>> table > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 'users' > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from > 20 > > to > > > > 40, > > > > >> >> and > > > > >> >> >>>> input > > > > >> >> >>>>>>>>>>>>>>> stream > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed > > by > > > > age > > > > >> of > > > > >> >> >>>> users. If > > > > >> >> >>>>>>>>>>>>>>> we > > > > >> >> >>>>>>>>>>>>>>>>> have > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30', > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. > > > This > > > > >> means > > > > >> >> >>> the > > > > >> >> >>>> user > > > > >> >> >>>>>>>>>>>>>>> can > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by > > almost > > > 2 > > > > >> >> times. > > > > >> >> >>> It > > > > >> >> >>>> will > > > > >> >> >>>>>>>>>>>>>>>>> gain a > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> huge > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this > > > > optimization > > > > >> >> >> starts > > > > >> >> >>>> to > > > > >> >> >>>>>>>>>>>>>>> really > > > > >> >> >>>>>>>>>>>>>>>>>>>> shine > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without > > > filters > > > > >> and > > > > >> >> >>>> projections > > > > >> >> >>>>>>>>>>>>>>>>> can't > > > > >> >> >>>>>>>>>>>>>>>>>>>> fit > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This > > opens > > > up > > > > >> >> >>>> additional > > > > >> >> >>>>>>>>>>>>>>>>>>>> possibilities > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as > 'not > > > > quite > > > > >> >> >>>> useful'. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices > > > > >> regarding > > > > >> >> >> this > > > > >> >> >>>> topic! > > > > >> >> >>>>>>>>>>>>>>>>> Because > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial > > > points, > > > > >> and I > > > > >> >> >>>> think > > > > >> >> >>>>>>>>>>>>>>> with > > > > >> >> >>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>> help > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to > > come > > > > to a > > > > >> >> >>>> consensus. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng > > Ren > > > < > > > > >> >> >>>>>>>>>>>>>>> renqs...@gmail.com > > > > >> >> >>>>>>>>>>>>>>>>>> : > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry > for > > my > > > > >> late > > > > >> >> >>>> response! > > > > >> >> >>>>>>>>>>>>>>> We > > > > >> >> >>>>>>>>>>>>>>>>> had > > > > >> >> >>>>>>>>>>>>>>>>>>>> an > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark > > and > > > > >> Leonard > > > > >> >> >>> and > > > > >> >> >>>> I’d > > > > >> >> >>>>>>>>>>>>>>> like > > > > >> >> >>>>>>>>>>>>>>>>> to > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of > > > implementing > > > > >> the > > > > >> >> >>> cache > > > > >> >> >>>>>>>>>>>>>>> logic in > > > > >> >> >>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>> table > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the > > > > >> user-provided > > > > >> >> >>>> table > > > > >> >> >>>>>>>>>>>>>>>>> function, > > > > >> >> >>>>>>>>>>>>>>>>>>>> we > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs > > extending > > > > >> >> >>>> TableFunction > > > > >> >> >>>>>>>>>>>>>>> with > > > > >> >> >>>>>>>>>>>>>>>>> these > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> concerns: > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the > semantic > > of > > > > >> "FOR > > > > >> >> >>>>>>>>>>>>>>> SYSTEM_TIME > > > > >> >> >>>>>>>>>>>>>>>>> AS OF > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly > > > reflect > > > > >> the > > > > >> >> >>>> content > > > > >> >> >>>>>>>>>>>>>>> of the > > > > >> >> >>>>>>>>>>>>>>>>>>>> lookup > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If > users > > > > >> choose > > > > >> >> to > > > > >> >> >>>> enable > > > > >> >> >>>>>>>>>>>>>>>>> caching > > > > >> >> >>>>>>>>>>>>>>>>>>>> on > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate > > that > > > > >> this > > > > >> >> >>>> breakage is > > > > >> >> >>>>>>>>>>>>>>>>>>>> acceptable > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> in > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we > > prefer > > > > not > > > > >> to > > > > >> >> >>>> provide > > > > >> >> >>>>>>>>>>>>>>>>> caching on > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> table runtime level. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation > > in > > > > the > > > > >> >> >>>> framework > > > > >> >> >>>>>>>>>>>>>>>>> (whether > > > > >> >> >>>>>>>>>>>>>>>>>>>> in a > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around > > TableFunction), > > > we > > > > >> have > > > > >> >> >> to > > > > >> >> >>>>>>>>>>>>>>> confront a > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> situation > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to > > control > > > > the > > > > >> >> >>>> behavior of > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> framework, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and > > > > should > > > > >> be > > > > >> >> >>>> cautious. > > > > >> >> >>>>>>>>>>>>>>>>> Under > > > > >> >> >>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the > > > framework > > > > >> >> should > > > > >> >> >>>> only be > > > > >> >> >>>>>>>>>>>>>>>>>>>> specified > > > > >> >> >>>>>>>>>>>>>>>>>>>>> by > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and > > it’s > > > > >> hard > > > > >> >> to > > > > >> >> >>>> apply > > > > >> >> >>>>>>>>>>>>>>> these > > > > >> >> >>>>>>>>>>>>>>>>>>>> general > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> configs to a specific table. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup > source > > > > loads > > > > >> and > > > > >> >> >>>> refresh > > > > >> >> >>>>>>>>>>>>>>> all > > > > >> >> >>>>>>>>>>>>>>>>>>>> records > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve > > > high > > > > >> >> lookup > > > > >> >> >>>>>>>>>>>>>>> performance > > > > >> >> >>>>>>>>>>>>>>>>>>>> (like > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> Hive > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also > > widely > > > > >> used > > > > >> >> by > > > > >> >> >>> our > > > > >> >> >>>>>>>>>>>>>>> internal > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around > > the > > > > >> user’s > > > > >> >> >>>>>>>>>>>>>>> TableFunction > > > > >> >> >>>>>>>>>>>>>>>>>>>> works > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> fine > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to > > > > >> introduce a > > > > >> >> >>> new > > > > >> >> >>>>>>>>>>>>>>>>> interface for > > > > >> >> >>>>>>>>>>>>>>>>>>>>> this > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design > would > > > > >> become > > > > >> >> >> more > > > > >> >> >>>>>>>>>>>>>>> complex. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the > framework > > > > might > > > > >> >> >>>> introduce > > > > >> >> >>>>>>>>>>>>>>>>>>>> compatibility > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like > > > there > > > > >> might > > > > >> >> >>>> exist two > > > > >> >> >>>>>>>>>>>>>>>>> caches > > > > >> >> >>>>>>>>>>>>>>>>>>>>> with > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the > user > > > > >> >> >> incorrectly > > > > >> >> >>>>>>>>>>>>>>> configures > > > > >> >> >>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>> table > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another > > > implemented > > > > >> by > > > > >> >> >> the > > > > >> >> >>>> lookup > > > > >> >> >>>>>>>>>>>>>>>>> source). > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by > > > > >> Alexander, I > > > > >> >> >>>> think > > > > >> >> >>>>>>>>>>>>>>>>> filters > > > > >> >> >>>>>>>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the > way > > > down > > > > >> to > > > > >> >> >> the > > > > >> >> >>>> table > > > > >> >> >>>>>>>>>>>>>>>>> function, > > > > >> >> >>>>>>>>>>>>>>>>>>>>> like > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead > of > > > the > > > > >> >> >> runner > > > > >> >> >>>> with > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>> cache. > > > > >> >> >>>>>>>>>>>>>>>>>>>>> The > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the > > > network > > > > >> I/O > > > > >> >> >> and > > > > >> >> >>>>>>>>>>>>>>> pressure > > > > >> >> >>>>>>>>>>>>>>>>> on the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> external system, and only applying > these > > > > >> >> >>> optimizations > > > > >> >> >>>> to > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>> cache > > > > >> >> >>>>>>>>>>>>>>>>>>>>> seems > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> not quite useful. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to > > > > reflect > > > > >> our > > > > >> >> >>>> ideas. > > > > >> >> >>>>>>>>>>>>>>> We > > > > >> >> >>>>>>>>>>>>>>>>>>>> prefer to > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part > > of > > > > >> >> >>>> TableFunction, > > > > >> >> >>>>>>>>>>>>>>> and we > > > > >> >> >>>>>>>>>>>>>>>>>>>> could > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> provide some helper classes > > > > >> (CachingTableFunction, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to > developers > > > and > > > > >> >> >> regulate > > > > >> >> >>>>>>>>>>>>>>> metrics > > > > >> >> >>>>>>>>>>>>>>>>> of the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> cache. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your > reference. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas! > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> [1] > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> > > > > >> >> >>>> > > > > >> >> >>> > > > > >> >> >> > > > > >> >> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> [2] > > > > >> >> >>> https://github.com/PatrickRen/flink/tree/FLIP-221 > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM > > Александр > > > > >> Смирнов > > > > >> >> >> < > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid! > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier > > > > solution > > > > >> as > > > > >> >> >> the > > > > >> >> >>>>>>>>>>>>>>> first > > > > >> >> >>>>>>>>>>>>>>>>> step: > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are > mutually > > > > >> exclusive > > > > >> >> >>>>>>>>>>>>>>> (originally > > > > >> >> >>>>>>>>>>>>>>>>>>>>> proposed > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because > > > > conceptually > > > > >> >> they > > > > >> >> >>>> follow > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>> same > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are > > > > >> different. > > > > >> >> >> If > > > > >> >> >>> we > > > > >> >> >>>>>>>>>>>>>>> will > > > > >> >> >>>>>>>>>>>>>>>>> go one > > > > >> >> >>>>>>>>>>>>>>>>>>>>> way, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future > > will > > > > mean > > > > >> >> >>>> deleting > > > > >> >> >>>>>>>>>>>>>>>>> existing > > > > >> >> >>>>>>>>>>>>>>>>>>>> code > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for > > > > >> connectors. > > > > >> >> >> So > > > > >> >> >>> I > > > > >> >> >>>>>>>>>>>>>>> think we > > > > >> >> >>>>>>>>>>>>>>>>>>>> should > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community > > > about > > > > >> that > > > > >> >> >> and > > > > >> >> >>>> then > > > > >> >> >>>>>>>>>>>>>>> work > > > > >> >> >>>>>>>>>>>>>>>>>>>>> together > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on > > > tasks > > > > >> for > > > > >> >> >>>> different > > > > >> >> >>>>>>>>>>>>>>>>> parts > > > > >> >> >>>>>>>>>>>>>>>>>>>> of > > > > >> >> >>>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache > > unification > > > / > > > > >> >> >>>> introducing > > > > >> >> >>>>>>>>>>>>>>>>> proposed > > > > >> >> >>>>>>>>>>>>>>>>>>>> set > > > > >> >> >>>>>>>>>>>>>>>>>>>>> of > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, > > > Qingsheng? > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the > > > > requests > > > > >> >> >> after > > > > >> >> >>>>>>>>>>>>>>> filter > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to > > fields > > > > of > > > > >> the > > > > >> >> >>>> lookup > > > > >> >> >>>>>>>>>>>>>>>>> table, we > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only > > after > > > > >> that we > > > > >> >> >>> can > > > > >> >> >>>>>>>>>>>>>>> filter > > > > >> >> >>>>>>>>>>>>>>>>>>>>> responses, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have > > > filter > > > > >> >> >>>> pushdown. So > > > > >> >> >>>>>>>>>>>>>>> if > > > > >> >> >>>>>>>>>>>>>>>>>>>>> filtering > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be > > > much > > > > >> less > > > > >> >> >>> rows > > > > >> >> >>>> in > > > > >> >> >>>>>>>>>>>>>>>>> cache. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your > > > > architecture > > > > >> is > > > > >> >> >> not > > > > >> >> >>>>>>>>>>>>>>> shared. > > > > >> >> >>>>>>>>>>>>>>>>> I > > > > >> >> >>>>>>>>>>>>>>>>>>>> don't > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> know the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be > honest. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such > > > kinds > > > > >> of > > > > >> >> >>>>>>>>>>>>>>> conversations > > > > >> >> >>>>>>>>>>>>>>>>> :) > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the > > confluence, > > > > so > > > > >> I > > > > >> >> >>> made a > > > > >> >> >>>>>>>>>>>>>>> Jira > > > > >> >> >>>>>>>>>>>>>>>>> issue, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes > in > > > > more > > > > >> >> >>> details > > > > >> >> >>>> - > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >> https://issues.apache.org/jira/browse/FLINK-27411. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback! > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Best, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid > > Heise > > > < > > > > >> >> >>>>>>>>>>>>>>> ar...@apache.org>: > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the > > > inconsistency > > > > >> was > > > > >> >> >> not > > > > >> >> >>>>>>>>>>>>>>>>> satisfying > > > > >> >> >>>>>>>>>>>>>>>>>>>> for > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> me. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but > > > could > > > > >> also > > > > >> >> >>> live > > > > >> >> >>>>>>>>>>>>>>> with > > > > >> >> >>>>>>>>>>>>>>>>> an > > > > >> >> >>>>>>>>>>>>>>>>>>>>> easier > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead > of > > > > >> making > > > > >> >> >>>> caching > > > > >> >> >>>>>>>>>>>>>>> an > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> implementation > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather > > > devise a > > > > >> >> >> caching > > > > >> >> >>>>>>>>>>>>>>> layer > > > > >> >> >>>>>>>>>>>>>>>>>>>> around X. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> So > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a > > CachingTableFunction > > > > >> that > > > > >> >> >>>>>>>>>>>>>>> delegates to > > > > >> >> >>>>>>>>>>>>>>>>> X in > > > > >> >> >>>>>>>>>>>>>>>>>>>>> case > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> of > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. > > > Lifting > > > > >> it > > > > >> >> >> into > > > > >> >> >>>> the > > > > >> >> >>>>>>>>>>>>>>>>> operator > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> model > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> as > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is > > > > >> probably > > > > >> >> >>>>>>>>>>>>>>> unnecessary > > > > >> >> >>>>>>>>>>>>>>>>> in > > > > >> >> >>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> first step > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source > > will > > > > only > > > > >> >> >>> receive > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>> requests > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> after > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be > > more > > > > >> >> >>> interesting > > > > >> >> >>>> to > > > > >> >> >>>>>>>>>>>>>>> save > > > > >> >> >>>>>>>>>>>>>>>>>>>>> memory). > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the > > > changes > > > > of > > > > >> >> >> this > > > > >> >> >>>> FLIP > > > > >> >> >>>>>>>>>>>>>>>>> would be > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> limited to > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public > > > > interfaces. > > > > >> >> >>>> Everything > > > > >> >> >>>>>>>>>>>>>>> else > > > > >> >> >>>>>>>>>>>>>>>>>>>>> remains > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> an > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. > That > > > > means > > > > >> we > > > > >> >> >> can > > > > >> >> >>>>>>>>>>>>>>> easily > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> incorporate > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that > Alexander > > > > >> pointed > > > > >> >> >> out > > > > >> >> >>>>>>>>>>>>>>> later. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your > > > > architecture > > > > >> is > > > > >> >> >> not > > > > >> >> >>>>>>>>>>>>>>> shared. > > > > >> >> >>>>>>>>>>>>>>>>> I > > > > >> >> >>>>>>>>>>>>>>>>>>>> don't > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> know the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be > honest. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM > > > Александр > > > > >> >> >> Смирнов > > > > >> >> >>> < > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, > > I'm > > > > >> not a > > > > >> >> >>>>>>>>>>>>>>> committer > > > > >> >> >>>>>>>>>>>>>>>>> yet, > > > > >> >> >>>>>>>>>>>>>>>>>>>> but > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> I'd > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this > > > FLIP > > > > >> >> really > > > > >> >> >>>>>>>>>>>>>>>>> interested > > > > >> >> >>>>>>>>>>>>>>>>>>>> me. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar > > > > >> feature in > > > > >> >> >> my > > > > >> >> >>>>>>>>>>>>>>>>> company’s > > > > >> >> >>>>>>>>>>>>>>>>>>>>> Flink > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share > our > > > > >> thoughts > > > > >> >> >> on > > > > >> >> >>>>>>>>>>>>>>> this and > > > > >> >> >>>>>>>>>>>>>>>>>>>> make > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> code > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> open source. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better > alternative > > > > than > > > > >> >> >>>>>>>>>>>>>>> introducing an > > > > >> >> >>>>>>>>>>>>>>>>>>>>> abstract > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction > > > > >> >> (CachingTableFunction). > > > > >> >> >>> As > > > > >> >> >>>>>>>>>>>>>>> you > > > > >> >> >>>>>>>>>>>>>>>>> know, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the > > > > >> flink-table-common > > > > >> >> >>>>>>>>>>>>>>> module, > > > > >> >> >>>>>>>>>>>>>>>>> which > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> provides > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with > tables – > > > > it’s > > > > >> >> very > > > > >> >> >>>>>>>>>>>>>>>>> convenient > > > > >> >> >>>>>>>>>>>>>>>>>>>> for > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> importing > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, > > > > >> CachingTableFunction > > > > >> >> >>>> contains > > > > >> >> >>>>>>>>>>>>>>>>> logic > > > > >> >> >>>>>>>>>>>>>>>>>>>> for > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution, so this class > and > > > > >> >> >> everything > > > > >> >> >>>>>>>>>>>>>>>>> connected > > > > >> >> >>>>>>>>>>>>>>>>>>>> with > > > > >> >> >>>>>>>>>>>>>>>>>>>>> it > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another > module, > > > > >> probably > > > > >> >> >> in > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to > > > > depend > > > > >> on > > > > >> >> >>>> another > > > > >> >> >>>>>>>>>>>>>>>>> module, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> which > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, > > which > > > > >> doesn’t > > > > >> >> >>>> sound > > > > >> >> >>>>>>>>>>>>>>>>> good. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method > > > > >> >> ‘getLookupConfig’ > > > > >> >> >>> to > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> LookupTableSource > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow > > > > >> connectors > > > > >> >> to > > > > >> >> >>>> only > > > > >> >> >>>>>>>>>>>>>>> pass > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, > > > therefore > > > > >> they > > > > >> >> >>> won’t > > > > >> >> >>>>>>>>>>>>>>>>> depend on > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> runtime > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs > > > > planner > > > > >> >> >> will > > > > >> >> >>>>>>>>>>>>>>>>> construct a > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> lookup > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding > > > runtime > > > > >> logic > > > > >> >> >>>>>>>>>>>>>>>>>>>> (ProcessFunctions > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> in > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). > > > Architecture > > > > >> >> looks > > > > >> >> >>>> like > > > > >> >> >>>>>>>>>>>>>>> in > > > > >> >> >>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>> pinned > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is > > > > actually > > > > >> >> >> yours > > > > >> >> >>>>>>>>>>>>>>>>>>>> CacheConfig). > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, > that > > > will > > > > >> be > > > > >> >> >>>>>>>>>>>>>>> responsible > > > > >> >> >>>>>>>>>>>>>>>>> for > > > > >> >> >>>>>>>>>>>>>>>>>>>>> this > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> – > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his > > > > >> inheritors. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in > > > > >> >> >>>>>>>>>>>>>>> flink-table-runtime > > > > >> >> >>>>>>>>>>>>>>>>> - > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, > > > AsyncLookupJoinRunner, > > > > >> >> >>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes > > > > >> >> >> LookupJoinCachingRunner, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, > etc. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more > powerful > > > > >> advantage > > > > >> >> >> of > > > > >> >> >>>>>>>>>>>>>>> such a > > > > >> >> >>>>>>>>>>>>>>>>>>>>> solution. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> If > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower > > level, > > > > we > > > > >> can > > > > >> >> >>>> apply > > > > >> >> >>>>>>>>>>>>>>> some > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it. > > > > >> LookupJoinRunnerWithCalc > > > > >> >> >> was > > > > >> >> >>>>>>>>>>>>>>> named > > > > >> >> >>>>>>>>>>>>>>>>> like > > > > >> >> >>>>>>>>>>>>>>>>>>>>> this > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ > function, > > > > which > > > > >> >> >>> actually > > > > >> >> >>>>>>>>>>>>>>>>> mostly > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> consists > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> of > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with > > > lookup > > > > >> table > > > > >> >> >> B > > > > >> >> >>>>>>>>>>>>>>>>> condition > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ‘JOIN … > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> ON > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 > > > WHERE > > > > >> >> >>> B.salary > > > > > >> >> >>>>>>>>>>>>>>> 1000’ > > > > >> >> >>>>>>>>>>>>>>>>>>>>> ‘calc’ > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters > A.age = > > > > >> B.age + > > > > >> >> >> 10 > > > > >> >> >>>> and > > > > >> >> >>>>>>>>>>>>>>>>>>>> B.salary > > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> 1000. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before > > > storing > > > > >> >> >> records > > > > >> >> >>> in > > > > >> >> >>>>>>>>>>>>>>>>> cache, > > > > >> >> >>>>>>>>>>>>>>>>>>>> size > > > > >> >> >>>>>>>>>>>>>>>>>>>>> of > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly > reduced: > > > > >> filters = > > > > >> >> >>>> avoid > > > > >> >> >>>>>>>>>>>>>>>>> storing > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> useless > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = > > reduce > > > > >> >> records’ > > > > >> >> >>>>>>>>>>>>>>> size. So > > > > >> >> >>>>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> initial > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can > be > > > > >> >> increased > > > > >> >> >>> by > > > > >> >> >>>>>>>>>>>>>>> the > > > > >> >> >>>>>>>>>>>>>>>>> user. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it? > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng > Ren > > > > wrote: > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a > > > > >> discussion > > > > >> >> >>> about > > > > >> >> >>>>>>>>>>>>>>>>>>>> FLIP-221[1], > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> which > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup > > > table > > > > >> >> cache > > > > >> >> >>> and > > > > >> >> >>>>>>>>>>>>>>> its > > > > >> >> >>>>>>>>>>>>>>>>>>>> standard > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> metrics. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source > > > > should > > > > >> >> >>>> implement > > > > >> >> >>>>>>>>>>>>>>>>> their > > > > >> >> >>>>>>>>>>>>>>>>>>>> own > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> cache to > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there > > isn’t a > > > > >> >> >> standard > > > > >> >> >>> of > > > > >> >> >>>>>>>>>>>>>>>>> metrics > > > > >> >> >>>>>>>>>>>>>>>>>>>> for > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> users and > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs > with > > > > lookup > > > > >> >> >>> joins, > > > > >> >> >>>>>>>>>>>>>>> which > > > > >> >> >>>>>>>>>>>>>>>>> is a > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> quite > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> common > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL. > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs > > > > >> including > > > > >> >> >>>> cache, > > > > >> >> >>>>>>>>>>>>>>>>>>>> metrics, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> wrapper > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new > > table > > > > >> >> options. > > > > >> >> >>>>>>>>>>>>>>> Please > > > > >> >> >>>>>>>>>>>>>>>>> take a > > > > >> >> >>>>>>>>>>>>>>>>>>>>> look > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> at the > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. > > Any > > > > >> >> >>> suggestions > > > > >> >> >>>>>>>>>>>>>>> and > > > > >> >> >>>>>>>>>>>>>>>>>>>> comments > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> would be > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated! > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> > > > > >> >> >>>> > > > > >> >> >>> > > > > >> >> >> > > > > >> >> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> -- > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Best Regards, > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com > > > > >> >> >>>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>>>> -- > > > > >> >> >>>>>>>>>>>>>>>>>>> Best regards, > > > > >> >> >>>>>>>>>>>>>>>>>>> Roman Boyko > > > > >> >> >>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > > > > >> >> >>>>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> -- > > > > >> >> >>>>>>>>>>>> Best Regards, > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Qingsheng Ren > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Real-time Computing Team > > > > >> >> >>>>>>>>>>>> Alibaba Cloud > > > > >> >> >>>>>>>>>>>> > > > > >> >> >>>>>>>>>>>> Email: renqs...@gmail.com > > > > >> >> >>>>>>>>>> > > > > >> >> >>>> > > > > >> >> >>>> > > > > >> >> >>> > > > > >> >> >> > > > > >> >> > > > > >> >> > > > > >> > > > > > > > > > > > > > > >