Hi Jark, Thanks for your reply!
Currently 'lookup.async' just lies in HBase connector, I have no idea whether or when to remove it (we can discuss it in another issue for the HBase connector after FLINK-27625 is done), just not add it into a common option now. Best, Lincoln Lee Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道: > Hi Lincoln, > > I have taken a look at FLIP-234, and I agree with you that the connectors > can > provide both async and sync runtime providers simultaneously instead of one > of them. > At that point, "lookup.async" looks redundant. If this option is planned to > be removed > in the long term, I think it makes sense not to introduce it in this FLIP. > > Best, > Jark > > On Tue, 24 May 2022 at 11:08, Lincoln Lee <lincoln.8...@gmail.com> wrote: > > > Hi Qingsheng, > > > > Sorry for jumping into the discussion so late. It's a good idea that we > can > > have a common table option. I have a minor comments on 'lookup.async' > that > > not make it a common option: > > > > The table layer abstracts both sync and async lookup capabilities, > > connectors implementers can choose one or both, in the case of > implementing > > only one capability(status of the most of existing builtin connectors) > > 'lookup.async' will not be used. And when a connector has both > > capabilities, I think this choice is more suitable for making decisions > at > > the query level, for example, table planner can choose the physical > > implementation of async lookup or sync lookup based on its cost model, or > > users can give query hint based on their own better understanding. If > > there is another common table option 'lookup.async', it may confuse the > > users in the long run. > > > > So, I prefer to leave the 'lookup.async' option in private place (for the > > current hbase connector) and not turn it into a common option. > > > > WDYT? > > > > Best, > > Lincoln Lee > > > > > > Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道: > > > > > Hi Alexander, > > > > > > Thanks for the review! We recently updated the FLIP and you can find > > those > > > changes from my latest email. Since some terminologies has changed so > > I’ll > > > use the new concept for replying your comments. > > > > > > 1. Builder vs ‘of’ > > > I’m OK to use builder pattern if we have additional optional parameters > > > for full caching mode (“rescan” previously). The schedule-with-delay > idea > > > looks reasonable to me, but I think we need to redesign the builder API > > of > > > full caching to make it more descriptive for developers. Would you mind > > > sharing your ideas about the API? For accessing the FLIP workspace you > > can > > > just provide your account ID and ping any PMC member including Jark. > > > > > > 2. Common table options > > > We have some discussions these days and propose to introduce 8 common > > > table options about caching. It has been updated on the FLIP. > > > > > > 3. Retries > > > I think we are on the same page :-) > > > > > > For your additional concerns: > > > 1) The table option has been updated. > > > 2) We got “lookup.cache” back for configuring whether to use partial or > > > full caching mode. > > > > > > Best regards, > > > > > > Qingsheng > > > > > > > > > > > > > On May 19, 2022, at 17:25, Александр Смирнов <smirale...@gmail.com> > > > wrote: > > > > > > > > Also I have a few additions: > > > > 1) maybe rename 'lookup.cache.maximum-size' to > > > > 'lookup.cache.max-rows'? I think it will be more clear that we talk > > > > not about bytes, but about the number of rows. Plus it fits more, > > > > considering my optimization with filters. > > > > 2) How will users enable rescanning? Are we going to separate caching > > > > and rescanning from the options point of view? Like initially we had > > > > one option 'lookup.cache' with values LRU / ALL. I think now we can > > > > make a boolean option 'lookup.rescan'. RescanInterval can be > > > > 'lookup.rescan.interval', etc. > > > > > > > > Best regards, > > > > Alexander > > > > > > > > чт, 19 мая 2022 г. в 14:50, Александр Смирнов <smirale...@gmail.com > >: > > > >> > > > >> Hi Qingsheng and Jark, > > > >> > > > >> 1. Builders vs 'of' > > > >> I understand that builders are used when we have multiple > parameters. > > > >> I suggested them because we could add parameters later. To prevent > > > >> Builder for ScanRuntimeProvider from looking redundant I can suggest > > > >> one more config now - "rescanStartTime". > > > >> It's a time in UTC (LocalTime class) when the first reload of cache > > > >> starts. This parameter can be thought of as 'initialDelay' (diff > > > >> between current time and rescanStartTime) in method > > > >> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be very > > > >> useful when the dimension table is updated by some other scheduled > job > > > >> at a certain time. Or when the user simply wants a second scan > (first > > > >> cache reload) be delayed. This option can be used even without > > > >> 'rescanInterval' - in this case 'rescanInterval' will be one day. > > > >> If you are fine with this option, I would be very glad if you would > > > >> give me access to edit FLIP page, so I could add it myself > > > >> > > > >> 2. Common table options > > > >> I also think that FactoryUtil would be overloaded by all cache > > > >> options. But maybe unify all suggested options, not only for default > > > >> cache? I.e. class 'LookupOptions', that unifies default cache > options, > > > >> rescan options, 'async', 'maxRetries'. WDYT? > > > >> > > > >> 3. Retries > > > >> I'm fine with suggestion close to RetryUtils#tryTimes(times, call) > > > >> > > > >> [1] > > > > > > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit- > > > >> > > > >> Best regards, > > > >> Alexander > > > >> > > > >> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <renqs...@gmail.com>: > > > >>> > > > >>> Hi Jark and Alexander, > > > >>> > > > >>> Thanks for your comments! I’m also OK to introduce common table > > > options. I prefer to introduce a new DefaultLookupCacheOptions class > for > > > holding these option definitions because putting all options into > > > FactoryUtil would make it a bit ”crowded” and not well categorized. > > > >>> > > > >>> FLIP has been updated according to suggestions above: > > > >>> 1. Use static “of” method for constructing RescanRuntimeProvider > > > considering both arguments are required. > > > >>> 2. Introduce new table options matching DefaultLookupCacheFactory > > > >>> > > > >>> Best, > > > >>> Qingsheng > > > >>> > > > >>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote: > > > >>>> > > > >>>> Hi Alex, > > > >>>> > > > >>>> 1) retry logic > > > >>>> I think we can extract some common retry logic into utilities, > e.g. > > > RetryUtils#tryTimes(times, call). > > > >>>> This seems independent of this FLIP and can be reused by > DataStream > > > users. > > > >>>> Maybe we can open an issue to discuss this and where to put it. > > > >>>> > > > >>>> 2) cache ConfigOptions > > > >>>> I'm fine with defining cache config options in the framework. > > > >>>> A candidate place to put is FactoryUtil which also includes > > > "sink.parallelism", "format" options. > > > >>>> > > > >>>> Best, > > > >>>> Jark > > > >>>> > > > >>>> > > > >>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов < > > smirale...@gmail.com> > > > wrote: > > > >>>>> > > > >>>>> Hi Qingsheng, > > > >>>>> > > > >>>>> Thank you for considering my comments. > > > >>>>> > > > >>>>>> there might be custom logic before making retry, such as > > > re-establish the connection > > > >>>>> > > > >>>>> Yes, I understand that. I meant that such logic can be placed in > a > > > >>>>> separate function, that can be implemented by connectors. Just > > moving > > > >>>>> the retry logic would make connector's LookupFunction more > concise > > + > > > >>>>> avoid duplicate code. However, it's a minor change. The decision > is > > > up > > > >>>>> to you. > > > >>>>> > > > >>>>>> We decide not to provide common DDL options and let developers > to > > > define their own options as we do now per connector. > > > >>>>> > > > >>>>> What is the reason for that? One of the main goals of this FLIP > was > > > to > > > >>>>> unify the configs, wasn't it? I understand that current cache > > design > > > >>>>> doesn't depend on ConfigOptions, like was before. But still we > can > > > put > > > >>>>> these options into the framework, so connectors can reuse them > and > > > >>>>> avoid code duplication, and, what is more significant, avoid > > possible > > > >>>>> different options naming. This moment can be pointed out in > > > >>>>> documentation for connector developers. > > > >>>>> > > > >>>>> Best regards, > > > >>>>> Alexander > > > >>>>> > > > >>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>: > > > >>>>>> > > > >>>>>> Hi Alexander, > > > >>>>>> > > > >>>>>> Thanks for the review and glad to see we are on the same page! I > > > think you forgot to cc the dev mailing list so I’m also quoting your > > reply > > > under this email. > > > >>>>>> > > > >>>>>>> We can add 'maxRetryTimes' option into this class > > > >>>>>> > > > >>>>>> In my opinion the retry logic should be implemented in lookup() > > > instead of in LookupFunction#eval(). Retrying is only meaningful under > > some > > > specific retriable failures, and there might be custom logic before > > making > > > retry, such as re-establish the connection (JdbcRowDataLookupFunction > is > > an > > > example), so it's more handy to leave it to the connector. > > > >>>>>> > > > >>>>>>> I don't see DDL options, that were in previous version of FLIP. > > Do > > > you have any special plans for them? > > > >>>>>> > > > >>>>>> We decide not to provide common DDL options and let developers > to > > > define their own options as we do now per connector. > > > >>>>>> > > > >>>>>> The rest of comments sound great and I’ll update the FLIP. Hope > we > > > can finalize our proposal soon! > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> > > > >>>>>> Qingsheng > > > >>>>>> > > > >>>>>> > > > >>>>>>> On May 17, 2022, at 13:46, Александр Смирнов < > > smirale...@gmail.com> > > > wrote: > > > >>>>>>> > > > >>>>>>> Hi Qingsheng and devs! > > > >>>>>>> > > > >>>>>>> I like the overall design of updated FLIP, however I have > several > > > >>>>>>> suggestions and questions. > > > >>>>>>> > > > >>>>>>> 1) Introducing LookupFunction as a subclass of TableFunction > is a > > > good > > > >>>>>>> idea. We can add 'maxRetryTimes' option into this class. 'eval' > > > method > > > >>>>>>> of new LookupFunction is great for this purpose. The same is > for > > > >>>>>>> 'async' case. > > > >>>>>>> > > > >>>>>>> 2) There might be other configs in future, such as > > > 'cacheMissingKey' > > > >>>>>>> in LookupFunctionProvider or 'rescanInterval' in > > > ScanRuntimeProvider. > > > >>>>>>> Maybe use Builder pattern in LookupFunctionProvider and > > > >>>>>>> RescanRuntimeProvider for more flexibility (use one 'build' > > method > > > >>>>>>> instead of many 'of' methods in future)? > > > >>>>>>> > > > >>>>>>> 3) What are the plans for existing TableFunctionProvider and > > > >>>>>>> AsyncTableFunctionProvider? I think they should be deprecated. > > > >>>>>>> > > > >>>>>>> 4) Am I right that the current design does not assume usage of > > > >>>>>>> user-provided LookupCache in re-scanning? In this case, it is > not > > > very > > > >>>>>>> clear why do we need methods such as 'invalidate' or 'putAll' > in > > > >>>>>>> LookupCache. > > > >>>>>>> > > > >>>>>>> 5) I don't see DDL options, that were in previous version of > > FLIP. > > > Do > > > >>>>>>> you have any special plans for them? > > > >>>>>>> > > > >>>>>>> If you don't mind, I would be glad to be able to make small > > > >>>>>>> adjustments to the FLIP document too. I think it's worth > > mentioning > > > >>>>>>> about what exactly optimizations are planning in the future. > > > >>>>>>> > > > >>>>>>> Best regards, > > > >>>>>>> Smirnov Alexander > > > >>>>>>> > > > >>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com > >: > > > >>>>>>>> > > > >>>>>>>> Hi Alexander and devs, > > > >>>>>>>> > > > >>>>>>>> Thank you very much for the in-depth discussion! As Jark > > > mentioned we were inspired by Alexander's idea and made a refactor on > our > > > design. FLIP-221 [1] has been updated to reflect our design now and we > > are > > > happy to hear more suggestions from you! > > > >>>>>>>> > > > >>>>>>>> Compared to the previous design: > > > >>>>>>>> 1. The lookup cache serves at table runtime level and is > > > integrated as a component of LookupJoinRunner as discussed previously. > > > >>>>>>>> 2. Interfaces are renamed and re-designed to reflect the new > > > design. > > > >>>>>>>> 3. We separate the all-caching case individually and > introduce a > > > new RescanRuntimeProvider to reuse the ability of scanning. We are > > planning > > > to support SourceFunction / InputFormat for now considering the > > complexity > > > of FLIP-27 Source API. > > > >>>>>>>> 4. A new interface LookupFunction is introduced to make the > > > semantic of lookup more straightforward for developers. > > > >>>>>>>> > > > >>>>>>>> For replying to Alexander: > > > >>>>>>>>> However I'm a little confused whether InputFormat is > deprecated > > > or not. Am I right that it will be so in the future, but currently it's > > not? > > > >>>>>>>> Yes you are right. InputFormat is not deprecated for now. I > > think > > > it will be deprecated in the future but we don't have a clear plan for > > that. > > > >>>>>>>> > > > >>>>>>>> Thanks again for the discussion on this FLIP and looking > forward > > > to cooperating with you after we finalize the design and interfaces! > > > >>>>>>>> > > > >>>>>>>> [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > >>>>>>>> > > > >>>>>>>> Best regards, > > > >>>>>>>> > > > >>>>>>>> Qingsheng > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов < > > > smirale...@gmail.com> wrote: > > > >>>>>>>>> > > > >>>>>>>>> Hi Jark, Qingsheng and Leonard! > > > >>>>>>>>> > > > >>>>>>>>> Glad to see that we came to a consensus on almost all points! > > > >>>>>>>>> > > > >>>>>>>>> However I'm a little confused whether InputFormat is > deprecated > > > or > > > >>>>>>>>> not. Am I right that it will be so in the future, but > currently > > > it's > > > >>>>>>>>> not? Actually I also think that for the first version it's OK > > to > > > use > > > >>>>>>>>> InputFormat in ALL cache realization, because supporting > rescan > > > >>>>>>>>> ability seems like a very distant prospect. But for this > > > decision we > > > >>>>>>>>> need a consensus among all discussion participants. > > > >>>>>>>>> > > > >>>>>>>>> In general, I don't have something to argue with your > > > statements. All > > > >>>>>>>>> of them correspond my ideas. Looking ahead, it would be nice > to > > > work > > > >>>>>>>>> on this FLIP cooperatively. I've already done a lot of work > on > > > lookup > > > >>>>>>>>> join caching with realization very close to the one we are > > > discussing, > > > >>>>>>>>> and want to share the results of this work. Anyway looking > > > forward for > > > >>>>>>>>> the FLIP update! > > > >>>>>>>>> > > > >>>>>>>>> Best regards, > > > >>>>>>>>> Smirnov Alexander > > > >>>>>>>>> > > > >>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>: > > > >>>>>>>>>> > > > >>>>>>>>>> Hi Alex, > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks for summarizing your points. > > > >>>>>>>>>> > > > >>>>>>>>>> In the past week, Qingsheng, Leonard, and I have discussed > it > > > several times > > > >>>>>>>>>> and we have totally refactored the design. > > > >>>>>>>>>> I'm glad to say we have reached a consensus on many of your > > > points! > > > >>>>>>>>>> Qingsheng is still working on updating the design docs and > > > maybe can be > > > >>>>>>>>>> available in the next few days. > > > >>>>>>>>>> I will share some conclusions from our discussions: > > > >>>>>>>>>> > > > >>>>>>>>>> 1) we have refactored the design towards to "cache in > > > framework" way. > > > >>>>>>>>>> > > > >>>>>>>>>> 2) a "LookupCache" interface for users to customize and a > > > default > > > >>>>>>>>>> implementation with builder for users to easy-use. > > > >>>>>>>>>> This can both make it possible to both have flexibility and > > > conciseness. > > > >>>>>>>>>> > > > >>>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup > cache, > > > esp reducing > > > >>>>>>>>>> IO. > > > >>>>>>>>>> Filter pushdown should be the final state and the unified > way > > > to both > > > >>>>>>>>>> support pruning ALL cache and LRU cache, > > > >>>>>>>>>> so I think we should make effort in this direction. If we > need > > > to support > > > >>>>>>>>>> filter pushdown for ALL cache anyway, why not use > > > >>>>>>>>>> it for LRU cache as well? Either way, as we decide to > > implement > > > the cache > > > >>>>>>>>>> in the framework, we have the chance to support > > > >>>>>>>>>> filter on cache anytime. This is an optimization and it > > doesn't > > > affect the > > > >>>>>>>>>> public API. I think we can create a JIRA issue to > > > >>>>>>>>>> discuss it when the FLIP is accepted. > > > >>>>>>>>>> > > > >>>>>>>>>> 4) The idea to support ALL cache is similar to your > proposal. > > > >>>>>>>>>> In the first version, we will only support InputFormat, > > > SourceFunction for > > > >>>>>>>>>> cache all (invoke InputFormat in join operator). > > > >>>>>>>>>> For FLIP-27 source, we need to join a true source operator > > > instead of > > > >>>>>>>>>> calling it embedded in the join operator. > > > >>>>>>>>>> However, this needs another FLIP to support the re-scan > > ability > > > for FLIP-27 > > > >>>>>>>>>> Source, and this can be a large work. > > > >>>>>>>>>> In order to not block this issue, we can put the effort of > > > FLIP-27 source > > > >>>>>>>>>> integration into future work and integrate > > > >>>>>>>>>> InputFormat&SourceFunction for now. > > > >>>>>>>>>> > > > >>>>>>>>>> I think it's fine to use InputFormat&SourceFunction, as they > > > are not > > > >>>>>>>>>> deprecated, otherwise, we have to introduce another function > > > >>>>>>>>>> similar to them which is meaningless. We need to plan > FLIP-27 > > > source > > > >>>>>>>>>> integration ASAP before InputFormat & SourceFunction are > > > deprecated. > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Jark > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов < > > > smirale...@gmail.com> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi Martijn! > > > >>>>>>>>>>> > > > >>>>>>>>>>> Got it. Therefore, the realization with InputFormat is not > > > considered. > > > >>>>>>>>>>> Thanks for clearing that up! > > > >>>>>>>>>>> > > > >>>>>>>>>>> Best regards, > > > >>>>>>>>>>> Smirnov Alexander > > > >>>>>>>>>>> > > > >>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser < > > > mart...@ververica.com>: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Hi, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> With regards to: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> But if there are plans to refactor all connectors to > > FLIP-27 > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old > > > interfaces will be > > > >>>>>>>>>>>> deprecated and connectors will either be refactored to use > > > the new ones > > > >>>>>>>>>>> or > > > >>>>>>>>>>>> dropped. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> The caching should work for connectors that are using > > FLIP-27 > > > interfaces, > > > >>>>>>>>>>>> we should not introduce new features for old interfaces. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Martijn > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов < > > > smirale...@gmail.com> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi Jark! > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Sorry for the late response. I would like to make some > > > comments and > > > >>>>>>>>>>>>> clarify my points. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 1) I agree with your first statement. I think we can > > achieve > > > both > > > >>>>>>>>>>>>> advantages this way: put the Cache interface in > > > flink-table-common, > > > >>>>>>>>>>>>> but have implementations of it in flink-table-runtime. > > > Therefore if a > > > >>>>>>>>>>>>> connector developer wants to use existing cache > strategies > > > and their > > > >>>>>>>>>>>>> implementations, he can just pass lookupConfig to the > > > planner, but if > > > >>>>>>>>>>>>> he wants to have its own cache implementation in his > > > TableFunction, it > > > >>>>>>>>>>>>> will be possible for him to use the existing interface > for > > > this > > > >>>>>>>>>>>>> purpose (we can explicitly point this out in the > > > documentation). In > > > >>>>>>>>>>>>> this way all configs and metrics will be unified. WDYT? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will > > > have 90% of > > > >>>>>>>>>>>>> lookup requests that can never be cached > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 2) Let me clarify the logic filters optimization in case > of > > > LRU cache. > > > >>>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here > we > > > always > > > >>>>>>>>>>>>> store the response of the dimension table in cache, even > > > after > > > >>>>>>>>>>>>> applying calc function. I.e. if there are no rows after > > > applying > > > >>>>>>>>>>>>> filters to the result of the 'eval' method of > > TableFunction, > > > we store > > > >>>>>>>>>>>>> the empty list by lookup keys. Therefore the cache line > > will > > > be > > > >>>>>>>>>>>>> filled, but will require much less memory (in bytes). > I.e. > > > we don't > > > >>>>>>>>>>>>> completely filter keys, by which result was pruned, but > > > significantly > > > >>>>>>>>>>>>> reduce required memory to store this result. If the user > > > knows about > > > >>>>>>>>>>>>> this behavior, he can increase the 'max-rows' option > before > > > the start > > > >>>>>>>>>>>>> of the job. But actually I came up with the idea that we > > can > > > do this > > > >>>>>>>>>>>>> automatically by using the 'maximumWeight' and 'weigher' > > > methods of > > > >>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the collection > of > > > rows > > > >>>>>>>>>>>>> (value of cache). Therefore cache can automatically fit > > much > > > more > > > >>>>>>>>>>>>> records than before. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Flink SQL has provided a standard way to do filters and > > > projects > > > >>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > > > SupportsProjectionPushDown. > > > >>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, > don't > > > mean it's > > > >>>>>>>>>>> hard > > > >>>>>>>>>>>>> to implement. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> It's debatable how difficult it will be to implement > filter > > > pushdown. > > > >>>>>>>>>>>>> But I think the fact that currently there is no database > > > connector > > > >>>>>>>>>>>>> with filter pushdown at least means that this feature > won't > > > be > > > >>>>>>>>>>>>> supported soon in connectors. Moreover, if we talk about > > > other > > > >>>>>>>>>>>>> connectors (not in Flink repo), their databases might not > > > support all > > > >>>>>>>>>>>>> Flink filters (or not support filters at all). I think > > users > > > are > > > >>>>>>>>>>>>> interested in supporting cache filters optimization > > > independently of > > > >>>>>>>>>>>>> supporting other features and solving more complex > problems > > > (or > > > >>>>>>>>>>>>> unsolvable at all). > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3) I agree with your third statement. Actually in our > > > internal version > > > >>>>>>>>>>>>> I also tried to unify the logic of scanning and reloading > > > data from > > > >>>>>>>>>>>>> connectors. But unfortunately, I didn't find a way to > unify > > > the logic > > > >>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, SourceFunction, > > > Source,...) > > > >>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I > settled > > > on using > > > >>>>>>>>>>>>> InputFormat, because it was used for scanning in all > lookup > > > >>>>>>>>>>>>> connectors. (I didn't know that there are plans to > > deprecate > > > >>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of > > > FLIP-27 source > > > >>>>>>>>>>>>> in ALL caching is not good idea, because this source was > > > designed to > > > >>>>>>>>>>>>> work in distributed environment (SplitEnumerator on > > > JobManager and > > > >>>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator > (lookup > > > join > > > >>>>>>>>>>>>> operator in our case). There is even no direct way to > pass > > > splits from > > > >>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works through > > > >>>>>>>>>>>>> SplitEnumeratorContext, which requires > > > >>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send > AddSplitEvents). > > > Usage of > > > >>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer and > > > easier. But if > > > >>>>>>>>>>>>> there are plans to refactor all connectors to FLIP-27, I > > > have the > > > >>>>>>>>>>>>> following ideas: maybe we can refuse from lookup join ALL > > > cache in > > > >>>>>>>>>>>>> favor of simple join with multiple scanning of batch > > source? > > > The point > > > >>>>>>>>>>>>> is that the only difference between lookup join ALL cache > > > and simple > > > >>>>>>>>>>>>> join with batch source is that in the first case scanning > > is > > > performed > > > >>>>>>>>>>>>> multiple times, in between which state (cache) is cleared > > > (correct me > > > >>>>>>>>>>>>> if I'm wrong). So what if we extend the functionality of > > > simple join > > > >>>>>>>>>>>>> to support state reloading + extend the functionality of > > > scanning > > > >>>>>>>>>>>>> batch source multiple times (this one should be easy with > > > new FLIP-27 > > > >>>>>>>>>>>>> source, that unifies streaming/batch reading - we will > need > > > to change > > > >>>>>>>>>>>>> only SplitEnumerator, which will pass splits again after > > > some TTL). > > > >>>>>>>>>>>>> WDYT? I must say that this looks like a long-term goal > and > > > will make > > > >>>>>>>>>>>>> the scope of this FLIP even larger than you said. Maybe > we > > > can limit > > > >>>>>>>>>>>>> ourselves to a simpler solution now (InputFormats). > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> So to sum up, my points is like this: > > > >>>>>>>>>>>>> 1) There is a way to make both concise and flexible > > > interfaces for > > > >>>>>>>>>>>>> caching in lookup join. > > > >>>>>>>>>>>>> 2) Cache filters optimization is important both in LRU > and > > > ALL caches. > > > >>>>>>>>>>>>> 3) It is unclear when filter pushdown will be supported > in > > > Flink > > > >>>>>>>>>>>>> connectors, some of the connectors might not have the > > > opportunity to > > > >>>>>>>>>>>>> support filter pushdown + as I know, currently filter > > > pushdown works > > > >>>>>>>>>>>>> only for scanning (not lookup). So cache filters + > > > projections > > > >>>>>>>>>>>>> optimization should be independent from other features. > > > >>>>>>>>>>>>> 4) ALL cache realization is a complex topic that involves > > > multiple > > > >>>>>>>>>>>>> aspects of how Flink is developing. Refusing from > > > InputFormat in favor > > > >>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization really > > > complex and > > > >>>>>>>>>>>>> not clear, so maybe instead of that we can extend the > > > functionality of > > > >>>>>>>>>>>>> simple join or not refuse from InputFormat in case of > > lookup > > > join ALL > > > >>>>>>>>>>>>> cache? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>> Smirnov Alexander > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> [1] > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > > > > https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher) > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> It's great to see the active discussion! I want to share > > my > > > ideas: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 1) implement the cache in framework vs. connectors base > > > >>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways should > > > work (e.g., > > > >>>>>>>>>>> cache > > > >>>>>>>>>>>>>> pruning, compatibility). > > > >>>>>>>>>>>>>> The framework way can provide more concise interfaces. > > > >>>>>>>>>>>>>> The connector base way can define more flexible cache > > > >>>>>>>>>>>>>> strategies/implementations. > > > >>>>>>>>>>>>>> We are still investigating a way to see if we can have > > both > > > >>>>>>>>>>> advantages. > > > >>>>>>>>>>>>>> We should reach a consensus that the way should be a > final > > > state, > > > >>>>>>>>>>> and we > > > >>>>>>>>>>>>>> are on the path to it. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 2) filters and projections pushdown: > > > >>>>>>>>>>>>>> I agree with Alex that the filter pushdown into cache > can > > > benefit a > > > >>>>>>>>>>> lot > > > >>>>>>>>>>>>> for > > > >>>>>>>>>>>>>> ALL cache. > > > >>>>>>>>>>>>>> However, this is not true for LRU cache. Connectors use > > > cache to > > > >>>>>>>>>>> reduce > > > >>>>>>>>>>>>> IO > > > >>>>>>>>>>>>>> requests to databases for better throughput. > > > >>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will > > > have 90% of > > > >>>>>>>>>>>>> lookup > > > >>>>>>>>>>>>>> requests that can never be cached > > > >>>>>>>>>>>>>> and hit directly to the databases. That means the cache > is > > > >>>>>>>>>>> meaningless in > > > >>>>>>>>>>>>>> this case. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do filters > > > and projects > > > >>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and > > > >>>>>>>>>>> SupportsProjectionPushDown. > > > >>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces, > don't > > > mean it's > > > >>>>>>>>>>> hard > > > >>>>>>>>>>>>> to > > > >>>>>>>>>>>>>> implement. > > > >>>>>>>>>>>>>> They should implement the pushdown interfaces to reduce > IO > > > and the > > > >>>>>>>>>>> cache > > > >>>>>>>>>>>>>> size. > > > >>>>>>>>>>>>>> That should be a final state that the scan source and > > > lookup source > > > >>>>>>>>>>> share > > > >>>>>>>>>>>>>> the exact pushdown implementation. > > > >>>>>>>>>>>>>> I don't see why we need to duplicate the pushdown logic > in > > > caches, > > > >>>>>>>>>>> which > > > >>>>>>>>>>>>>> will complex the lookup join design. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 3) ALL cache abstraction > > > >>>>>>>>>>>>>> All cache might be the most challenging part of this > FLIP. > > > We have > > > >>>>>>>>>>> never > > > >>>>>>>>>>>>>> provided a reload-lookup public interface. > > > >>>>>>>>>>>>>> Currently, we put the reload logic in the "eval" method > of > > > >>>>>>>>>>> TableFunction. > > > >>>>>>>>>>>>>> That's hard for some sources (e.g., Hive). > > > >>>>>>>>>>>>>> Ideally, connector implementation should share the logic > > of > > > reload > > > >>>>>>>>>>> and > > > >>>>>>>>>>>>>> scan, i.e. ScanTableSource with > > > InputFormat/SourceFunction/FLIP-27 > > > >>>>>>>>>>>>> Source. > > > >>>>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated, and > > the > > > FLIP-27 > > > >>>>>>>>>>>>> source > > > >>>>>>>>>>>>>> is deeply coupled with SourceOperator. > > > >>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin, > > this > > > may make > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>> scope of this FLIP much larger. > > > >>>>>>>>>>>>>> We are still investigating how to abstract the ALL cache > > > logic and > > > >>>>>>>>>>> reuse > > > >>>>>>>>>>>>>> the existing source interfaces. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>> Jark > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko < > > > ro.v.bo...@gmail.com> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> It's a much more complicated activity and lies out of > the > > > scope of > > > >>>>>>>>>>> this > > > >>>>>>>>>>>>>>> improvement. Because such pushdowns should be done for > > all > > > >>>>>>>>>>>>> ScanTableSource > > > >>>>>>>>>>>>>>> implementations (not only for Lookup ones). > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser < > > > >>>>>>>>>>> martijnvis...@apache.org> > > > >>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hi everyone, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> One question regarding "And Alexander correctly > > mentioned > > > that > > > >>>>>>>>>>> filter > > > >>>>>>>>>>>>>>>> pushdown still is not implemented for > jdbc/hive/hbase." > > > -> Would > > > >>>>>>>>>>> an > > > >>>>>>>>>>>>>>>> alternative solution be to actually implement these > > filter > > > >>>>>>>>>>> pushdowns? > > > >>>>>>>>>>>>> I > > > >>>>>>>>>>>>>>>> can > > > >>>>>>>>>>>>>>>> imagine that there are many more benefits to doing > that, > > > outside > > > >>>>>>>>>>> of > > > >>>>>>>>>>>>> lookup > > > >>>>>>>>>>>>>>>> caching and metrics. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Martijn Visser > > > >>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82 > > > >>>>>>>>>>>>>>>> https://github.com/MartijnVisser > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko < > > > ro.v.bo...@gmail.com> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Hi everyone! > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement! > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> I do think that single cache implementation would be > a > > > nice > > > >>>>>>>>>>>>> opportunity > > > >>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF > > > proc_time" > > > >>>>>>>>>>>>> semantics > > > >>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be implemented. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say that: > > > >>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut off > > the > > > cache > > > >>>>>>>>>>> size > > > >>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most handy > > > way to do > > > >>>>>>>>>>> it > > > >>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>> apply > > > >>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder to > > > pass it > > > >>>>>>>>>>>>> through the > > > >>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander > > correctly > > > >>>>>>>>>>> mentioned > > > >>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>> filter pushdown still is not implemented for > > > jdbc/hive/hbase. > > > >>>>>>>>>>>>>>>>> 2) The ability to set the different caching > parameters > > > for > > > >>>>>>>>>>> different > > > >>>>>>>>>>>>>>>> tables > > > >>>>>>>>>>>>>>>>> is quite important. So I would prefer to set it > through > > > DDL > > > >>>>>>>>>>> rather > > > >>>>>>>>>>>>> than > > > >>>>>>>>>>>>>>>>> have similar ttla, strategy and other options for all > > > lookup > > > >>>>>>>>>>> tables. > > > >>>>>>>>>>>>>>>>> 3) Providing the cache into the framework really > > > deprives us of > > > >>>>>>>>>>>>>>>>> extensibility (users won't be able to implement their > > own > > > >>>>>>>>>>> cache). > > > >>>>>>>>>>>>> But > > > >>>>>>>>>>>>>>>> most > > > >>>>>>>>>>>>>>>>> probably it might be solved by creating more > different > > > cache > > > >>>>>>>>>>>>> strategies > > > >>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>> a wider set of configurations. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> All these points are much closer to the schema > proposed > > > by > > > >>>>>>>>>>>>> Alexander. > > > >>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and > > all > > > these > > > >>>>>>>>>>>>>>>> facilities > > > >>>>>>>>>>>>>>>>> might be simply implemented in your architecture? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>>>>>> Roman Boyko > > > >>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser < > > > >>>>>>>>>>>>> martijnvis...@apache.org> > > > >>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Hi everyone, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to > > > express that > > > >>>>>>>>>>> I > > > >>>>>>>>>>>>> really > > > >>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic > and I > > > hope > > > >>>>>>>>>>> that > > > >>>>>>>>>>>>>>>> others > > > >>>>>>>>>>>>>>>>>> will join the conversation. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Martijn > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов < > > > >>>>>>>>>>>>> smirale...@gmail.com> > > > >>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark, > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I have > > > questions > > > >>>>>>>>>>>>> about > > > >>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get > > > something?). > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR > > > SYSTEM_TIME > > > >>>>>>>>>>> AS OF > > > >>>>>>>>>>>>>>>>>> proc_time” > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS > OF > > > >>>>>>>>>>> proc_time" > > > >>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>> not > > > >>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you said, > > users > > > go > > > >>>>>>>>>>> on it > > > >>>>>>>>>>>>>>>>>>> consciously to achieve better performance (no one > > > proposed > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>> enable > > > >>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you mean > > > other > > > >>>>>>>>>>>>> developers > > > >>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>> connectors? In this case developers explicitly > > specify > > > >>>>>>>>>>> whether > > > >>>>>>>>>>>>> their > > > >>>>>>>>>>>>>>>>>>> connector supports caching or not (in the list of > > > supported > > > >>>>>>>>>>>>>>>> options), > > > >>>>>>>>>>>>>>>>>>> no one makes them do that if they don't want to. So > > > what > > > >>>>>>>>>>>>> exactly is > > > >>>>>>>>>>>>>>>>>>> the difference between implementing caching in > > modules > > > >>>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common from > > the > > > >>>>>>>>>>>>> considered > > > >>>>>>>>>>>>>>>>>>> point of view? How does it affect on > > > breaking/non-breaking > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"? > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> confront a situation that allows table options in > > DDL > > > to > > > >>>>>>>>>>>>> control > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> behavior of the framework, which has never happened > > > >>>>>>>>>>> previously > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>> should > > > >>>>>>>>>>>>>>>>>>> be cautious > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> If we talk about main differences of semantics of > DDL > > > >>>>>>>>>>> options > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about > > > limiting > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>> scope > > > >>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>> the options + importance for the user business > logic > > > rather > > > >>>>>>>>>>> than > > > >>>>>>>>>>>>>>>>>>> specific location of corresponding logic in the > > > framework? I > > > >>>>>>>>>>>>> mean > > > >>>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>>> in my design, for example, putting an option with > > > lookup > > > >>>>>>>>>>> cache > > > >>>>>>>>>>>>>>>>>>> strategy in configurations would be the wrong > > > decision, > > > >>>>>>>>>>>>> because it > > > >>>>>>>>>>>>>>>>>>> directly affects the user's business logic (not > just > > > >>>>>>>>>>> performance > > > >>>>>>>>>>>>>>>>>>> optimization) + touches just several functions of > ONE > > > table > > > >>>>>>>>>>>>> (there > > > >>>>>>>>>>>>>>>> can > > > >>>>>>>>>>>>>>>>>>> be multiple tables with different caches). Does it > > > really > > > >>>>>>>>>>>>> matter for > > > >>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is > > located, > > > >>>>>>>>>>> which is > > > >>>>>>>>>>>>>>>>>>> affected by the applied option? > > > >>>>>>>>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism', > > > which in > > > >>>>>>>>>>>>> some way > > > >>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I > don't > > > see any > > > >>>>>>>>>>>>> problem > > > >>>>>>>>>>>>>>>>>>> here. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching > > > scenario > > > >>>>>>>>>>> and > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> design > > > >>>>>>>>>>>>>>>>>>> would become more complex > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but > > > actually > > > >>>>>>>>>>> in our > > > >>>>>>>>>>>>>>>>>>> internal version we solved this problem quite > easily > > - > > > we > > > >>>>>>>>>>> reused > > > >>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new > > API). > > > The > > > >>>>>>>>>>>>> point is > > > >>>>>>>>>>>>>>>>>>> that currently all lookup connectors use > InputFormat > > > for > > > >>>>>>>>>>>>> scanning > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it > > uses > > > >>>>>>>>>>> class > > > >>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper > > around > > > >>>>>>>>>>>>> InputFormat. > > > >>>>>>>>>>>>>>>>>>> The advantage of this solution is the ability to > > reload > > > >>>>>>>>>>> cache > > > >>>>>>>>>>>>> data > > > >>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>> parallel (number of threads depends on number of > > > >>>>>>>>>>> InputSplits, > > > >>>>>>>>>>>>> but > > > >>>>>>>>>>>>>>>> has > > > >>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time > > > significantly > > > >>>>>>>>>>>>> reduces > > > >>>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I know > > that > > > >>>>>>>>>>> usually > > > >>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>>> try > > > >>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but > > maybe > > > this > > > >>>>>>>>>>> one > > > >>>>>>>>>>>>> can > > > >>>>>>>>>>>>>>>> be > > > >>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal > > > solution, > > > >>>>>>>>>>> maybe > > > >>>>>>>>>>>>>>>> there > > > >>>>>>>>>>>>>>>>>>> are better ones. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might > introduce > > > >>>>>>>>>>>>> compatibility > > > >>>>>>>>>>>>>>>>>> issues > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> It's possible only in cases when the developer of > the > > > >>>>>>>>>>> connector > > > >>>>>>>>>>>>>>>> won't > > > >>>>>>>>>>>>>>>>>>> properly refactor his code and will use new cache > > > options > > > >>>>>>>>>>>>>>>> incorrectly > > > >>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2 > > > different > > > >>>>>>>>>>> code > > > >>>>>>>>>>>>>>>>>>> places). For correct behavior all he will need to > do > > > is to > > > >>>>>>>>>>>>> redirect > > > >>>>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig (+ > > > maybe > > > >>>>>>>>>>> add an > > > >>>>>>>>>>>>>>>> alias > > > >>>>>>>>>>>>>>>>>>> for options, if there was different naming), > > everything > > > >>>>>>>>>>> will be > > > >>>>>>>>>>>>>>>>>>> transparent for users. If the developer won't do > > > >>>>>>>>>>> refactoring at > > > >>>>>>>>>>>>> all, > > > >>>>>>>>>>>>>>>>>>> nothing will be changed for the connector because > of > > > >>>>>>>>>>> backward > > > >>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use his > > own > > > >>>>>>>>>>> cache > > > >>>>>>>>>>>>> logic, > > > >>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs into > > the > > > >>>>>>>>>>>>> framework, > > > >>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>> instead make his own implementation with already > > > existing > > > >>>>>>>>>>>>> configs > > > >>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare > case). > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> filters and projections should be pushed all the > way > > > down > > > >>>>>>>>>>> to > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>> function, like what we do in the scan source > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that the > > ONLY > > > >>>>>>>>>>> connector > > > >>>>>>>>>>>>>>>> that > > > >>>>>>>>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource > > > >>>>>>>>>>>>>>>>>>> (no database connector supports it currently). Also > > > for some > > > >>>>>>>>>>>>>>>> databases > > > >>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex > > filters > > > >>>>>>>>>>> that we > > > >>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>>>>> in Flink. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> only applying these optimizations to the cache > seems > > > not > > > >>>>>>>>>>>>> quite > > > >>>>>>>>>>>>>>>>> useful > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of > > data > > > >>>>>>>>>>> from the > > > >>>>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose in > > > dimension > > > >>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>> 'users' > > > >>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40, and > > > input > > > >>>>>>>>>>> stream > > > >>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of > > > users. If > > > >>>>>>>>>>> we > > > >>>>>>>>>>>>> have > > > >>>>>>>>>>>>>>>>>>> filter 'age > 30', > > > >>>>>>>>>>>>>>>>>>> there will be twice less data in cache. This means > > the > > > user > > > >>>>>>>>>>> can > > > >>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 times. > > It > > > will > > > >>>>>>>>>>>>> gain a > > > >>>>>>>>>>>>>>>>>>> huge > > > >>>>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization > starts > > > to > > > >>>>>>>>>>> really > > > >>>>>>>>>>>>>>>> shine > > > >>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and > > > projections > > > >>>>>>>>>>>>> can't > > > >>>>>>>>>>>>>>>> fit > > > >>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up > > > additional > > > >>>>>>>>>>>>>>>> possibilities > > > >>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite > > > useful'. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> It would be great to hear other voices regarding > this > > > topic! > > > >>>>>>>>>>>>> Because > > > >>>>>>>>>>>>>>>>>>> we have quite a lot of controversial points, and I > > > think > > > >>>>>>>>>>> with > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> help > > > >>>>>>>>>>>>>>>>>>> of others it will be easier for us to come to a > > > consensus. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>>>>>>>> Smirnov Alexander > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren < > > > >>>>>>>>>>> renqs...@gmail.com > > > >>>>>>>>>>>>>> : > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid, > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late > > > response! > > > >>>>>>>>>>> We > > > >>>>>>>>>>>>> had > > > >>>>>>>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>>>> internal discussion together with Jark and Leonard > > and > > > I’d > > > >>>>>>>>>>> like > > > >>>>>>>>>>>>> to > > > >>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the > > cache > > > >>>>>>>>>>> logic in > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the user-provided > > > table > > > >>>>>>>>>>>>> function, > > > >>>>>>>>>>>>>>>> we > > > >>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending > > > TableFunction > > > >>>>>>>>>>> with > > > >>>>>>>>>>>>> these > > > >>>>>>>>>>>>>>>>>>> concerns: > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR > > > >>>>>>>>>>> SYSTEM_TIME > > > >>>>>>>>>>>>> AS OF > > > >>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the > > > content > > > >>>>>>>>>>> of the > > > >>>>>>>>>>>>>>>> lookup > > > >>>>>>>>>>>>>>>>>>> table at the moment of querying. If users choose to > > > enable > > > >>>>>>>>>>>>> caching > > > >>>>>>>>>>>>>>>> on > > > >>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this > > > breakage is > > > >>>>>>>>>>>>>>>> acceptable > > > >>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not to > > > provide > > > >>>>>>>>>>>>> caching on > > > >>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> table runtime level. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the > > > framework > > > >>>>>>>>>>>>> (whether > > > >>>>>>>>>>>>>>>> in a > > > >>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we have > to > > > >>>>>>>>>>> confront a > > > >>>>>>>>>>>>>>>>>> situation > > > >>>>>>>>>>>>>>>>>>> that allows table options in DDL to control the > > > behavior of > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> framework, > > > >>>>>>>>>>>>>>>>>>> which has never happened previously and should be > > > cautious. > > > >>>>>>>>>>>>> Under > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> current design the behavior of the framework should > > > only be > > > >>>>>>>>>>>>>>>> specified > > > >>>>>>>>>>>>>>>>> by > > > >>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard to > > > apply > > > >>>>>>>>>>> these > > > >>>>>>>>>>>>>>>> general > > > >>>>>>>>>>>>>>>>>>> configs to a specific table. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads and > > > refresh > > > >>>>>>>>>>> all > > > >>>>>>>>>>>>>>>> records > > > >>>>>>>>>>>>>>>>>>> periodically into the memory to achieve high lookup > > > >>>>>>>>>>> performance > > > >>>>>>>>>>>>>>>> (like > > > >>>>>>>>>>>>>>>>>> Hive > > > >>>>>>>>>>>>>>>>>>> connector in the community, and also widely used by > > our > > > >>>>>>>>>>> internal > > > >>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the user’s > > > >>>>>>>>>>> TableFunction > > > >>>>>>>>>>>>>>>> works > > > >>>>>>>>>>>>>>>>>> fine > > > >>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to introduce a > > new > > > >>>>>>>>>>>>> interface for > > > >>>>>>>>>>>>>>>>> this > > > >>>>>>>>>>>>>>>>>>> all-caching scenario and the design would become > more > > > >>>>>>>>>>> complex. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might > > > introduce > > > >>>>>>>>>>>>>>>> compatibility > > > >>>>>>>>>>>>>>>>>>> issues to existing lookup sources like there might > > > exist two > > > >>>>>>>>>>>>> caches > > > >>>>>>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>>>>>> totally different strategies if the user > incorrectly > > > >>>>>>>>>>> configures > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> table > > > >>>>>>>>>>>>>>>>>>> (one in the framework and another implemented by > the > > > lookup > > > >>>>>>>>>>>>> source). > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I > > > think > > > >>>>>>>>>>>>> filters > > > >>>>>>>>>>>>>>>> and > > > >>>>>>>>>>>>>>>>>>> projections should be pushed all the way down to > the > > > table > > > >>>>>>>>>>>>> function, > > > >>>>>>>>>>>>>>>>> like > > > >>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the > runner > > > with > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>> cache. > > > >>>>>>>>>>>>>>>>> The > > > >>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O > and > > > >>>>>>>>>>> pressure > > > >>>>>>>>>>>>> on the > > > >>>>>>>>>>>>>>>>>>> external system, and only applying these > > optimizations > > > to > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>> cache > > > >>>>>>>>>>>>>>>>> seems > > > >>>>>>>>>>>>>>>>>>> not quite useful. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our > > > ideas. > > > >>>>>>>>>>> We > > > >>>>>>>>>>>>>>>> prefer to > > > >>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of > > > TableFunction, > > > >>>>>>>>>>> and we > > > >>>>>>>>>>>>>>>> could > > > >>>>>>>>>>>>>>>>>>> provide some helper classes (CachingTableFunction, > > > >>>>>>>>>>>>>>>>>> AllCachingTableFunction, > > > >>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and > regulate > > > >>>>>>>>>>> metrics > > > >>>>>>>>>>>>> of the > > > >>>>>>>>>>>>>>>>>> cache. > > > >>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Looking forward to your ideas! > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> [1] > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > >>>>>>>>>>>>>>>>>>>> [2] > > https://github.com/PatrickRen/flink/tree/FLIP-221 > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Qingsheng > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов > < > > > >>>>>>>>>>>>>>>>>> smirale...@gmail.com> > > > >>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid! > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> I have few comments on your message. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> but could also live with an easier solution as > the > > > >>>>>>>>>>> first > > > >>>>>>>>>>>>> step: > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive > > > >>>>>>>>>>> (originally > > > >>>>>>>>>>>>>>>>> proposed > > > >>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually they > > > follow > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>> same > > > >>>>>>>>>>>>>>>>>>>>> goal, but implementation details are different. > If > > we > > > >>>>>>>>>>> will > > > >>>>>>>>>>>>> go one > > > >>>>>>>>>>>>>>>>> way, > > > >>>>>>>>>>>>>>>>>>>>> moving to another way in the future will mean > > > deleting > > > >>>>>>>>>>>>> existing > > > >>>>>>>>>>>>>>>> code > > > >>>>>>>>>>>>>>>>>>>>> and once again changing the API for connectors. > So > > I > > > >>>>>>>>>>> think we > > > >>>>>>>>>>>>>>>> should > > > >>>>>>>>>>>>>>>>>>>>> reach a consensus with the community about that > and > > > then > > > >>>>>>>>>>> work > > > >>>>>>>>>>>>>>>>> together > > > >>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for > > > different > > > >>>>>>>>>>>>> parts > > > >>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification / > > > introducing > > > >>>>>>>>>>>>> proposed > > > >>>>>>>>>>>>>>>> set > > > >>>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng? > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> as the source will only receive the requests > after > > > >>>>>>>>>>> filter > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of the > > > lookup > > > >>>>>>>>>>>>> table, we > > > >>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that we > > can > > > >>>>>>>>>>> filter > > > >>>>>>>>>>>>>>>>> responses, > > > >>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter > > > pushdown. So > > > >>>>>>>>>>> if > > > >>>>>>>>>>>>>>>>> filtering > > > >>>>>>>>>>>>>>>>>>>>> is done before caching, there will be much less > > rows > > > in > > > >>>>>>>>>>>>> cache. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is > not > > > >>>>>>>>>>> shared. > > > >>>>>>>>>>>>> I > > > >>>>>>>>>>>>>>>> don't > > > >>>>>>>>>>>>>>>>>>> know the > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of > > > >>>>>>>>>>> conversations > > > >>>>>>>>>>>>> :) > > > >>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I > > made a > > > >>>>>>>>>>> Jira > > > >>>>>>>>>>>>> issue, > > > >>>>>>>>>>>>>>>>>>>>> where described the proposed changes in more > > details > > > - > > > >>>>>>>>>>>>>>>>>>>>> > https://issues.apache.org/jira/browse/FLINK-27411. > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback! > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> Best, > > > >>>>>>>>>>>>>>>>>>>>> Smirnov Alexander > > > >>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise < > > > >>>>>>>>>>> ar...@apache.org>: > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was > not > > > >>>>>>>>>>>>> satisfying > > > >>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>> me. > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could also > > live > > > >>>>>>>>>>> with > > > >>>>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>> easier > > > >>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making > > > caching > > > >>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>>>> implementation > > > >>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a > caching > > > >>>>>>>>>>> layer > > > >>>>>>>>>>>>>>>> around X. > > > >>>>>>>>>>>>>>>>>> So > > > >>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that > > > >>>>>>>>>>> delegates to > > > >>>>>>>>>>>>> X in > > > >>>>>>>>>>>>>>>>> case > > > >>>>>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it > into > > > the > > > >>>>>>>>>>>>> operator > > > >>>>>>>>>>>>>>>>>> model > > > >>>>>>>>>>>>>>>>>>> as > > > >>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably > > > >>>>>>>>>>> unnecessary > > > >>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>> first step > > > >>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only > > receive > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> requests > > > >>>>>>>>>>>>>>>>>>> after > > > >>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more > > interesting > > > to > > > >>>>>>>>>>> save > > > >>>>>>>>>>>>>>>>> memory). > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of > this > > > FLIP > > > >>>>>>>>>>>>> would be > > > >>>>>>>>>>>>>>>>>>> limited to > > > >>>>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces. > > > Everything > > > >>>>>>>>>>> else > > > >>>>>>>>>>>>>>>>> remains > > > >>>>>>>>>>>>>>>>>> an > > > >>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means we > can > > > >>>>>>>>>>> easily > > > >>>>>>>>>>>>>>>>>> incorporate > > > >>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed > out > > > >>>>>>>>>>> later. > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is > not > > > >>>>>>>>>>> shared. > > > >>>>>>>>>>>>> I > > > >>>>>>>>>>>>>>>> don't > > > >>>>>>>>>>>>>>>>>>> know the > > > >>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest. > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр > Смирнов > > < > > > >>>>>>>>>>>>>>>>>>> smirale...@gmail.com> > > > >>>>>>>>>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a > > > >>>>>>>>>>> committer > > > >>>>>>>>>>>>> yet, > > > >>>>>>>>>>>>>>>> but > > > >>>>>>>>>>>>>>>>>> I'd > > > >>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP really > > > >>>>>>>>>>>>> interested > > > >>>>>>>>>>>>>>>> me. > > > >>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in > my > > > >>>>>>>>>>>>> company’s > > > >>>>>>>>>>>>>>>>> Flink > > > >>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts > on > > > >>>>>>>>>>> this and > > > >>>>>>>>>>>>>>>> make > > > >>>>>>>>>>>>>>>>>> code > > > >>>>>>>>>>>>>>>>>>>>>>> open source. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than > > > >>>>>>>>>>> introducing an > > > >>>>>>>>>>>>>>>>> abstract > > > >>>>>>>>>>>>>>>>>>>>>>> class for TableFunction (CachingTableFunction). > > As > > > >>>>>>>>>>> you > > > >>>>>>>>>>>>> know, > > > >>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common > > > >>>>>>>>>>> module, > > > >>>>>>>>>>>>> which > > > >>>>>>>>>>>>>>>>>>> provides > > > >>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s very > > > >>>>>>>>>>>>> convenient > > > >>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>> importing > > > >>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction > > > contains > > > >>>>>>>>>>>>> logic > > > >>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>>>>>> runtime execution, so this class and > everything > > > >>>>>>>>>>>>> connected > > > >>>>>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>>>>>>>>> should be located in another module, probably > in > > > >>>>>>>>>>>>>>>>>>> flink-table-runtime. > > > >>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend on > > > another > > > >>>>>>>>>>>>> module, > > > >>>>>>>>>>>>>>>>>> which > > > >>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t > > > sound > > > >>>>>>>>>>>>> good. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method ‘getLookupConfig’ > > to > > > >>>>>>>>>>>>>>>>>> LookupTableSource > > > >>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors to > > > only > > > >>>>>>>>>>> pass > > > >>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore they > > won’t > > > >>>>>>>>>>>>> depend on > > > >>>>>>>>>>>>>>>>>>> runtime > > > >>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner > will > > > >>>>>>>>>>>>> construct a > > > >>>>>>>>>>>>>>>>>> lookup > > > >>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime logic > > > >>>>>>>>>>>>>>>> (ProcessFunctions > > > >>>>>>>>>>>>>>>>>> in > > > >>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture looks > > > like > > > >>>>>>>>>>> in > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>> pinned > > > >>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually > yours > > > >>>>>>>>>>>>>>>> CacheConfig). > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be > > > >>>>>>>>>>> responsible > > > >>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>> this > > > >>>>>>>>>>>>>>>>>> – > > > >>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors. > > > >>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in > > > >>>>>>>>>>> flink-table-runtime > > > >>>>>>>>>>>>> - > > > >>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner, > > > >>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc, > > > >>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes > LookupJoinCachingRunner, > > > >>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful advantage > of > > > >>>>>>>>>>> such a > > > >>>>>>>>>>>>>>>>> solution. > > > >>>>>>>>>>>>>>>>>>> If > > > >>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can > > > apply > > > >>>>>>>>>>> some > > > >>>>>>>>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc > was > > > >>>>>>>>>>> named > > > >>>>>>>>>>>>> like > > > >>>>>>>>>>>>>>>>> this > > > >>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which > > actually > > > >>>>>>>>>>>>> mostly > > > >>>>>>>>>>>>>>>>>> consists > > > >>>>>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>>>>> filters and projections. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup table > B > > > >>>>>>>>>>>>> condition > > > >>>>>>>>>>>>>>>>> ‘JOIN … > > > >>>>>>>>>>>>>>>>>>> ON > > > >>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE > > B.salary > > > > >>>>>>>>>>> 1000’ > > > >>>>>>>>>>>>>>>>> ‘calc’ > > > >>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age + > 10 > > > and > > > >>>>>>>>>>>>>>>> B.salary > > > > >>>>>>>>>>>>>>>>>>> 1000. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing > records > > in > > > >>>>>>>>>>>>> cache, > > > >>>>>>>>>>>>>>>> size > > > >>>>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: filters = > > > avoid > > > >>>>>>>>>>>>> storing > > > >>>>>>>>>>>>>>>>>> useless > > > >>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce records’ > > > >>>>>>>>>>> size. So > > > >>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> initial > > > >>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be increased > > by > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>> user. > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> What do you think about it? > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote: > > > >>>>>>>>>>>>>>>>>>>>>>>> Hi devs, > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion > > about > > > >>>>>>>>>>>>>>>> FLIP-221[1], > > > >>>>>>>>>>>>>>>>>>> which > > > >>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table cache > > and > > > >>>>>>>>>>> its > > > >>>>>>>>>>>>>>>> standard > > > >>>>>>>>>>>>>>>>>>> metrics. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should > > > implement > > > >>>>>>>>>>>>> their > > > >>>>>>>>>>>>>>>> own > > > >>>>>>>>>>>>>>>>>>> cache to > > > >>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a > standard > > of > > > >>>>>>>>>>>>> metrics > > > >>>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>>> users and > > > >>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup > > joins, > > > >>>>>>>>>>> which > > > >>>>>>>>>>>>> is a > > > >>>>>>>>>>>>>>>>>> quite > > > >>>>>>>>>>>>>>>>>>> common > > > >>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL. > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including > > > cache, > > > >>>>>>>>>>>>>>>> metrics, > > > >>>>>>>>>>>>>>>>>>> wrapper > > > >>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table options. > > > >>>>>>>>>>> Please > > > >>>>>>>>>>>>> take a > > > >>>>>>>>>>>>>>>>> look > > > >>>>>>>>>>>>>>>>>>> at the > > > >>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any > > suggestions > > > >>>>>>>>>>> and > > > >>>>>>>>>>>>>>>> comments > > > >>>>>>>>>>>>>>>>>>> would be > > > >>>>>>>>>>>>>>>>>>>>>>> appreciated! > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> [1] > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>>>>>>>> Best Regards, > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Qingsheng Ren > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Real-time Computing Team > > > >>>>>>>>>>>>>>>>>>>> Alibaba Cloud > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>>> Best regards, > > > >>>>>>>>>>>>>>> Roman Boyko > > > >>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> -- > > > >>>>>>>> Best Regards, > > > >>>>>>>> > > > >>>>>>>> Qingsheng Ren > > > >>>>>>>> > > > >>>>>>>> Real-time Computing Team > > > >>>>>>>> Alibaba Cloud > > > >>>>>>>> > > > >>>>>>>> Email: renqs...@gmail.com > > > >>>>>> > > > > > > > > >