Hi Qingsheng, Alexander,


Thanks for your reply.
> Can you give an example of such upper - level Cache usage? It's not clear for 
> me currently. I think it's unnecessary to have such high level abstraction, 
> if nowhere in the code we won't operate with objects as instances of Cache. 
> But maybe there are other opinions on this.

I have't find any other usage yet. Maybe it can be used in DataStream API. If 
you all think it's unnecessary, we can ignore it.




> I think there won't be many problems with supporting metrics in ALL cache. 
> Moreover, some of proposed metrics are most useful especially in ALL case, 
> for example, 'latestLoadTimeGauge' or 'numCachedRecords', so necessary 
> metrics definitely should be supported in this cache strategy.
Sorry for my mistake. There is no problem with it.


Best regards,
Yuan


At 2022-05-17 17:15:20, "Qingsheng Ren" <renqs...@gmail.com> wrote:
>Hi Yuan,
>
>Thanks for the review! Basically I’m with Alexander opinion. We’d like to 
>limit the scope in lookup scenario so we didn’t extend the cache to a generic 
>one. And as for the metric I think the existing metric definitions are also 
>applicable for all-cache case. 
>
>Cheers, 
>
>Qingsheng
>
>
>> On May 15, 2022, at 21:17, zst...@163.com wrote:
>> 
>> Hi Qingsheng and devs,
>> 
>> Thanks for your heated discussion and redesign to optmize this feature. I 
>> just have two comments:
>> 1. How about abtract the LookupCache to a higher level with a common Cache? 
>> It will be convenient for devs to use in other place.
>> 
>> 2. Does it have any metrics, such as NumCachedRecords for the AllCache?
>> Best regards,
>> Yuan
>> 
>> At 2022-05-13 20:27:44, "Qingsheng Ren" <renqs...@gmail.com> wrote:
>> >Hi Alexander and devs,
>> >
>> >Thank you very much for the in-depth discussion! As Jark mentioned we were
>> >inspired by Alexander's idea and made a refactor on our design. FLIP-221
>> >[1] has been updated to reflect our design now and we are happy to hear
>> >more suggestions from you!
>> >
>> >Compared to the previous design:
>> >1. The lookup cache serves at table runtime level and is integrated as a
>> >component of LookupJoinRunner as discussed previously.
>> >2. Interfaces are renamed and re-designed to reflect the new design.
>> >3. We separate the all-caching case individually and introduce a new
>> >RescanRuntimeProvider to reuse the ability of scanning. We are planning to
>> >support SourceFunction / InputFormat for now considering the complexity of
>> >FLIP-27 Source API.
>> >4. A new interface LookupFunction is introduced to make the semantic of
>> >lookup more straightforward for developers.
>> >
>> >For replying to Alexander:
>> >> However I'm a little confused whether InputFormat is deprecated or not.
>> >Am I right that it will be so in the future, but currently it's not?
>> >Yes you are right. InputFormat is not deprecated for now. I think it will
>> >be deprecated in the future but we don't have a clear plan for that.
>> >
>> >Thanks again for the discussion on this FLIP and looking forward to
>> >cooperating with you after we finalize the design and interfaces!
>> >
>> >[1]
>> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> >
>> >Best regards,
>> >
>> >Qingsheng
>> >
>> >
>> >On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <smirale...@gmail.com>
>> >wrote:
>> >
>> >> Hi Jark, Qingsheng and Leonard!
>> >>
>> >> Glad to see that we came to a consensus on almost all points!
>> >>
>> >> However I'm a little confused whether InputFormat is deprecated or
>> >> not. Am I right that it will be so in the future, but currently it's
>> >> not? Actually I also think that for the first version it's OK to use
>> >> InputFormat in ALL cache realization, because supporting rescan
>> >> ability seems like a very distant prospect. But for this decision we
>> >> need a consensus among all discussion participants.
>> >>
>> >> In general, I don't have something to argue with your statements. All
>> >> of them correspond my ideas. Looking ahead, it would be nice to work
>> >> on this FLIP cooperatively. I've already done a lot of work on lookup
>> >> join caching with realization very close to the one we are discussing,
>> >> and want to share the results of this work. Anyway looking forward for
>> >> the FLIP update!
>> >>
>> >> Best regards,
>> >> Smirnov Alexander
>> >>
>> >> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>:
>> >> >
>> >> > Hi Alex,
>> >> >
>> >> > Thanks for summarizing your points.
>> >> >
>> >> > In the past week, Qingsheng, Leonard, and I have discussed it several
>> >> times
>> >> > and we have totally refactored the design.
>> >> > I'm glad to say we have reached a consensus on many of your points!
>> >> > Qingsheng is still working on updating the design docs and maybe can be
>> >> > available in the next few days.
>> >> > I will share some conclusions from our discussions:
>> >> >
>> >> > 1) we have refactored the design towards to "cache in framework" way.
>> >> >
>> >> > 2) a "LookupCache" interface for users to customize and a default
>> >> > implementation with builder for users to easy-use.
>> >> > This can both make it possible to both have flexibility and conciseness.
>> >> >
>> >> > 3) Filter pushdown is important for ALL and LRU lookup cache, esp
>> >> reducing
>> >> > IO.
>> >> > Filter pushdown should be the final state and the unified way to both
>> >> > support pruning ALL cache and LRU cache,
>> >> > so I think we should make effort in this direction. If we need to 
>> >> > support
>> >> > filter pushdown for ALL cache anyway, why not use
>> >> > it for LRU cache as well? Either way, as we decide to implement the 
>> >> > cache
>> >> > in the framework, we have the chance to support
>> >> > filter on cache anytime. This is an optimization and it doesn't affect
>> >> the
>> >> > public API. I think we can create a JIRA issue to
>> >> > discuss it when the FLIP is accepted.
>> >> >
>> >> > 4) The idea to support ALL cache is similar to your proposal.
>> >> > In the first version, we will only support InputFormat, SourceFunction
>> >> for
>> >> > cache all (invoke InputFormat in join operator).
>> >> > For FLIP-27 source, we need to join a true source operator instead of
>> >> > calling it embedded in the join operator.
>> >> > However, this needs another FLIP to support the re-scan ability for
>> >> FLIP-27
>> >> > Source, and this can be a large work.
>> >> > In order to not block this issue, we can put the effort of FLIP-27 
>> >> > source
>> >> > integration into future work and integrate
>> >> > InputFormat&SourceFunction for now.
>> >> >
>> >> > I think it's fine to use InputFormat&SourceFunction, as they are not
>> >> > deprecated, otherwise, we have to introduce another function
>> >> > similar to them which is meaningless. We need to plan FLIP-27 source
>> >> > integration ASAP before InputFormat & SourceFunction are deprecated.
>> >> >
>> >> > Best,
>> >> > Jark
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Thu, 12 May 2022 at 15:46, Александр Смирнов <smirale...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi Martijn!
>> >> > >
>> >> > > Got it. Therefore, the realization with InputFormat is not considered.
>> >> > > Thanks for clearing that up!
>> >> > >
>> >> > > Best regards,
>> >> > > Smirnov Alexander
>> >> > >
>> >> > > чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com>:
>> >> > > >
>> >> > > > Hi,
>> >> > > >
>> >> > > > With regards to:
>> >> > > >
>> >> > > > > But if there are plans to refactor all connectors to FLIP-27
>> >> > > >
>> >> > > > Yes, FLIP-27 is the target for all connectors. The old interfaces
>> >> will be
>> >> > > > deprecated and connectors will either be refactored to use the new
>> >> ones
>> >> > > or
>> >> > > > dropped.
>> >> > > >
>> >> > > > The caching should work for connectors that are using FLIP-27
>> >> interfaces,
>> >> > > > we should not introduce new features for old interfaces.
>> >> > > >
>> >> > > > Best regards,
>> >> > > >
>> >> > > > Martijn
>> >> > > >
>> >> > > > On Thu, 12 May 2022 at 06:19, Александр Смирнов <
>> >> smirale...@gmail.com>
>> >> > > > wrote:
>> >> > > >
>> >> > > > > Hi Jark!
>> >> > > > >
>> >> > > > > Sorry for the late response. I would like to make some comments 
>> >> > > > > and
>> >> > > > > clarify my points.
>> >> > > > >
>> >> > > > > 1) I agree with your first statement. I think we can achieve both
>> >> > > > > advantages this way: put the Cache interface in 
>> >> > > > > flink-table-common,
>> >> > > > > but have implementations of it in flink-table-runtime. Therefore
>> >> if a
>> >> > > > > connector developer wants to use existing cache strategies and
>> >> their
>> >> > > > > implementations, he can just pass lookupConfig to the planner, but
>> >> if
>> >> > > > > he wants to have its own cache implementation in his
>> >> TableFunction, it
>> >> > > > > will be possible for him to use the existing interface for this
>> >> > > > > purpose (we can explicitly point this out in the documentation). 
>> >> > > > > In
>> >> > > > > this way all configs and metrics will be unified. WDYT?
>> >> > > > >
>> >> > > > > > If a filter can prune 90% of data in the cache, we will have 90%
>> >> of
>> >> > > > > lookup requests that can never be cached
>> >> > > > >
>> >> > > > > 2) Let me clarify the logic filters optimization in case of LRU
>> >> cache.
>> >> > > > > It looks like Cache<RowData, Collection<RowData>>. Here we always
>> >> > > > > store the response of the dimension table in cache, even after
>> >> > > > > applying calc function. I.e. if there are no rows after applying
>> >> > > > > filters to the result of the 'eval' method of TableFunction, we
>> >> store
>> >> > > > > the empty list by lookup keys. Therefore the cache line will be
>> >> > > > > filled, but will require much less memory (in bytes). I.e. we 
>> >> > > > > don't
>> >> > > > > completely filter keys, by which result was pruned, but
>> >> significantly
>> >> > > > > reduce required memory to store this result. If the user knows
>> >> about
>> >> > > > > this behavior, he can increase the 'max-rows' option before the
>> >> start
>> >> > > > > of the job. But actually I came up with the idea that we can do
>> >> this
>> >> > > > > automatically by using the 'maximumWeight' and 'weigher' methods 
>> >> > > > > of
>> >> > > > > GuavaCache [1]. Weight can be the size of the collection of rows
>> >> > > > > (value of cache). Therefore cache can automatically fit much more
>> >> > > > > records than before.
>> >> > > > >
>> >> > > > > > Flink SQL has provided a standard way to do filters and projects
>> >> > > > > pushdown, i.e., SupportsFilterPushDown and
>> >> SupportsProjectionPushDown.
>> >> > > > > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean
>> >> it's
>> >> > > hard
>> >> > > > > to implement.
>> >> > > > >
>> >> > > > > It's debatable how difficult it will be to implement filter
>> >> pushdown.
>> >> > > > > But I think the fact that currently there is no database connector
>> >> > > > > with filter pushdown at least means that this feature won't be
>> >> > > > > supported soon in connectors. Moreover, if we talk about other
>> >> > > > > connectors (not in Flink repo), their databases might not support
>> >> all
>> >> > > > > Flink filters (or not support filters at all). I think users are
>> >> > > > > interested in supporting cache filters optimization  independently
>> >> of
>> >> > > > > supporting other features and solving more complex problems (or
>> >> > > > > unsolvable at all).
>> >> > > > >
>> >> > > > > 3) I agree with your third statement. Actually in our internal
>> >> version
>> >> > > > > I also tried to unify the logic of scanning and reloading data 
>> >> > > > > from
>> >> > > > > connectors. But unfortunately, I didn't find a way to unify the
>> >> logic
>> >> > > > > of all ScanRuntimeProviders (InputFormat, SourceFunction,
>> >> Source,...)
>> >> > > > > and reuse it in reloading ALL cache. As a result I settled on 
>> >> > > > > using
>> >> > > > > InputFormat, because it was used for scanning in all lookup
>> >> > > > > connectors. (I didn't know that there are plans to deprecate
>> >> > > > > InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27
>> >> source
>> >> > > > > in ALL caching is not good idea, because this source was designed
>> >> to
>> >> > > > > work in distributed environment (SplitEnumerator on JobManager and
>> >> > > > > SourceReaders on TaskManagers), not in one operator (lookup join
>> >> > > > > operator in our case). There is even no direct way to pass splits
>> >> from
>> >> > > > > SplitEnumerator to SourceReader (this logic works through
>> >> > > > > SplitEnumeratorContext, which requires
>> >> > > > > OperatorCoordinator.SubtaskGateway to send AddSplitEvents). Usage
>> >> of
>> >> > > > > InputFormat for ALL cache seems much more clearer and easier. But
>> >> if
>> >> > > > > there are plans to refactor all connectors to FLIP-27, I have the
>> >> > > > > following ideas: maybe we can refuse from lookup join ALL cache in
>> >> > > > > favor of simple join with multiple scanning of batch source? The
>> >> point
>> >> > > > > is that the only difference between lookup join ALL cache and
>> >> simple
>> >> > > > > join with batch source is that in the first case scanning is
>> >> performed
>> >> > > > > multiple times, in between which state (cache) is cleared (correct
>> >> me
>> >> > > > > if I'm wrong). So what if we extend the functionality of simple
>> >> join
>> >> > > > > to support state reloading + extend the functionality of scanning
>> >> > > > > batch source multiple times (this one should be easy with new
>> >> FLIP-27
>> >> > > > > source, that unifies streaming/batch reading - we will need to
>> >> change
>> >> > > > > only SplitEnumerator, which will pass splits again after some 
>> >> > > > > TTL).
>> >> > > > > WDYT? I must say that this looks like a long-term goal and will
>> >> make
>> >> > > > > the scope of this FLIP even larger than you said. Maybe we can
>> >> limit
>> >> > > > > ourselves to a simpler solution now (InputFormats).
>> >> > > > >
>> >> > > > > So to sum up, my points is like this:
>> >> > > > > 1) There is a way to make both concise and flexible interfaces for
>> >> > > > > caching in lookup join.
>> >> > > > > 2) Cache filters optimization is important both in LRU and ALL
>> >> caches.
>> >> > > > > 3) It is unclear when filter pushdown will be supported in Flink
>> >> > > > > connectors, some of the connectors might not have the opportunity
>> >> to
>> >> > > > > support filter pushdown + as I know, currently filter pushdown
>> >> works
>> >> > > > > only for scanning (not lookup). So cache filters + projections
>> >> > > > > optimization should be independent from other features.
>> >> > > > > 4) ALL cache realization is a complex topic that involves multiple
>> >> > > > > aspects of how Flink is developing. Refusing from InputFormat in
>> >> favor
>> >> > > > > of FLIP-27 Source will make ALL cache realization really complex
>> >> and
>> >> > > > > not clear, so maybe instead of that we can extend the
>> >> functionality of
>> >> > > > > simple join or not refuse from InputFormat in case of lookup join
>> >> ALL
>> >> > > > > cache?
>> >> > > > >
>> >> > > > > Best regards,
>> >> > > > > Smirnov Alexander
>> >> > > > >
>> >> > > > >
>> >> > > > >
>> >> > > > >
>> >> > > > >
>> >> > > > >
>> >> > > > >
>> >> > > > > [1]
>> >> > > > >
>> >> > >
>> >> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
>> >> > > > >
>> >> > > > > чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
>> >> > > > > >
>> >> > > > > > It's great to see the active discussion! I want to share my
>> >> ideas:
>> >> > > > > >
>> >> > > > > > 1) implement the cache in framework vs. connectors base
>> >> > > > > > I don't have a strong opinion on this. Both ways should work
>> >> (e.g.,
>> >> > > cache
>> >> > > > > > pruning, compatibility).
>> >> > > > > > The framework way can provide more concise interfaces.
>> >> > > > > > The connector base way can define more flexible cache
>> >> > > > > > strategies/implementations.
>> >> > > > > > We are still investigating a way to see if we can have both
>> >> > > advantages.
>> >> > > > > > We should reach a consensus that the way should be a final 
>> >> > > > > > state,
>> >> > > and we
>> >> > > > > > are on the path to it.
>> >> > > > > >
>> >> > > > > > 2) filters and projections pushdown:
>> >> > > > > > I agree with Alex that the filter pushdown into cache can
>> >> benefit a
>> >> > > lot
>> >> > > > > for
>> >> > > > > > ALL cache.
>> >> > > > > > However, this is not true for LRU cache. Connectors use cache to
>> >> > > reduce
>> >> > > > > IO
>> >> > > > > > requests to databases for better throughput.
>> >> > > > > > If a filter can prune 90% of data in the cache, we will have 90%
>> >> of
>> >> > > > > lookup
>> >> > > > > > requests that can never be cached
>> >> > > > > > and hit directly to the databases. That means the cache is
>> >> > > meaningless in
>> >> > > > > > this case.
>> >> > > > > >
>> >> > > > > > IMO, Flink SQL has provided a standard way to do filters and
>> >> projects
>> >> > > > > > pushdown, i.e., SupportsFilterPushDown and
>> >> > > SupportsProjectionPushDown.
>> >> > > > > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean
>> >> it's
>> >> > > hard
>> >> > > > > to
>> >> > > > > > implement.
>> >> > > > > > They should implement the pushdown interfaces to reduce IO and
>> >> the
>> >> > > cache
>> >> > > > > > size.
>> >> > > > > > That should be a final state that the scan source and lookup
>> >> source
>> >> > > share
>> >> > > > > > the exact pushdown implementation.
>> >> > > > > > I don't see why we need to duplicate the pushdown logic in
>> >> caches,
>> >> > > which
>> >> > > > > > will complex the lookup join design.
>> >> > > > > >
>> >> > > > > > 3) ALL cache abstraction
>> >> > > > > > All cache might be the most challenging part of this FLIP. We
>> >> have
>> >> > > never
>> >> > > > > > provided a reload-lookup public interface.
>> >> > > > > > Currently, we put the reload logic in the "eval" method of
>> >> > > TableFunction.
>> >> > > > > > That's hard for some sources (e.g., Hive).
>> >> > > > > > Ideally, connector implementation should share the logic of
>> >> reload
>> >> > > and
>> >> > > > > > scan, i.e. ScanTableSource with
>> >> InputFormat/SourceFunction/FLIP-27
>> >> > > > > Source.
>> >> > > > > > However, InputFormat/SourceFunction are deprecated, and the
>> >> FLIP-27
>> >> > > > > source
>> >> > > > > > is deeply coupled with SourceOperator.
>> >> > > > > > If we want to invoke the FLIP-27 source in LookupJoin, this may
>> >> make
>> >> > > the
>> >> > > > > > scope of this FLIP much larger.
>> >> > > > > > We are still investigating how to abstract the ALL cache logic
>> >> and
>> >> > > reuse
>> >> > > > > > the existing source interfaces.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > Best,
>> >> > > > > > Jark
>> >> > > > > >
>> >> > > > > >
>> >> > > > > >
>> >> > > > > > On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com>
>> >> > > wrote:
>> >> > > > > >
>> >> > > > > > > It's a much more complicated activity and lies out of the
>> >> scope of
>> >> > > this
>> >> > > > > > > improvement. Because such pushdowns should be done for all
>> >> > > > > ScanTableSource
>> >> > > > > > > implementations (not only for Lookup ones).
>> >> > > > > > >
>> >> > > > > > > On Thu, 5 May 2022 at 19:02, Martijn Visser <
>> >> > > martijnvis...@apache.org>
>> >> > > > > > > wrote:
>> >> > > > > > >
>> >> > > > > > >> Hi everyone,
>> >> > > > > > >>
>> >> > > > > > >> One question regarding "And Alexander correctly mentioned 
>> >> > > > > > >> that
>> >> > > filter
>> >> > > > > > >> pushdown still is not implemented for jdbc/hive/hbase." ->
>> >> Would
>> >> > > an
>> >> > > > > > >> alternative solution be to actually implement these filter
>> >> > > pushdowns?
>> >> > > > > I
>> >> > > > > > >> can
>> >> > > > > > >> imagine that there are many more benefits to doing that,
>> >> outside
>> >> > > of
>> >> > > > > lookup
>> >> > > > > > >> caching and metrics.
>> >> > > > > > >>
>> >> > > > > > >> Best regards,
>> >> > > > > > >>
>> >> > > > > > >> Martijn Visser
>> >> > > > > > >> https://twitter.com/MartijnVisser82
>> >> > > > > > >> https://github.com/MartijnVisser
>> >> > > > > > >>
>> >> > > > > > >>
>> >> > > > > > >> On Thu, 5 May 2022 at 13:58, Roman Boyko <
>> >> ro.v.bo...@gmail.com>
>> >> > > > > wrote:
>> >> > > > > > >>
>> >> > > > > > >> > Hi everyone!
>> >> > > > > > >> >
>> >> > > > > > >> > Thanks for driving such a valuable improvement!
>> >> > > > > > >> >
>> >> > > > > > >> > I do think that single cache implementation would be a nice
>> >> > > > > opportunity
>> >> > > > > > >> for
>> >> > > > > > >> > users. And it will break the "FOR SYSTEM_TIME AS OF
>> >> proc_time"
>> >> > > > > semantics
>> >> > > > > > >> > anyway - doesn't matter how it will be implemented.
>> >> > > > > > >> >
>> >> > > > > > >> > Putting myself in the user's shoes, I can say that:
>> >> > > > > > >> > 1) I would prefer to have the opportunity to cut off the
>> >> cache
>> >> > > size
>> >> > > > > by
>> >> > > > > > >> > simply filtering unnecessary data. And the most handy way
>> >> to do
>> >> > > it
>> >> > > > > is
>> >> > > > > > >> apply
>> >> > > > > > >> > it inside LookupRunners. It would be a bit harder to pass 
>> >> > > > > > >> > it
>> >> > > > > through the
>> >> > > > > > >> > LookupJoin node to TableFunction. And Alexander correctly
>> >> > > mentioned
>> >> > > > > that
>> >> > > > > > >> > filter pushdown still is not implemented for
>> >> jdbc/hive/hbase.
>> >> > > > > > >> > 2) The ability to set the different caching parameters for
>> >> > > different
>> >> > > > > > >> tables
>> >> > > > > > >> > is quite important. So I would prefer to set it through DDL
>> >> > > rather
>> >> > > > > than
>> >> > > > > > >> > have similar ttla, strategy and other options for all 
>> >> > > > > > >> > lookup
>> >> > > tables.
>> >> > > > > > >> > 3) Providing the cache into the framework really deprives
>> >> us of
>> >> > > > > > >> > extensibility (users won't be able to implement their own
>> >> > > cache).
>> >> > > > > But
>> >> > > > > > >> most
>> >> > > > > > >> > probably it might be solved by creating more different 
>> >> > > > > > >> > cache
>> >> > > > > strategies
>> >> > > > > > >> and
>> >> > > > > > >> > a wider set of configurations.
>> >> > > > > > >> >
>> >> > > > > > >> > All these points are much closer to the schema proposed by
>> >> > > > > Alexander.
>> >> > > > > > >> > Qingshen Ren, please correct me if I'm not right and all
>> >> these
>> >> > > > > > >> facilities
>> >> > > > > > >> > might be simply implemented in your architecture?
>> >> > > > > > >> >
>> >> > > > > > >> > Best regards,
>> >> > > > > > >> > Roman Boyko
>> >> > > > > > >> > e.: ro.v.bo...@gmail.com
>> >> > > > > > >> >
>> >> > > > > > >> > On Wed, 4 May 2022 at 21:01, Martijn Visser <
>> >> > > > > martijnvis...@apache.org>
>> >> > > > > > >> > wrote:
>> >> > > > > > >> >
>> >> > > > > > >> > > Hi everyone,
>> >> > > > > > >> > >
>> >> > > > > > >> > > I don't have much to chip in, but just wanted to express
>> >> that
>> >> > > I
>> >> > > > > really
>> >> > > > > > >> > > appreciate the in-depth discussion on this topic and I
>> >> hope
>> >> > > that
>> >> > > > > > >> others
>> >> > > > > > >> > > will join the conversation.
>> >> > > > > > >> > >
>> >> > > > > > >> > > Best regards,
>> >> > > > > > >> > >
>> >> > > > > > >> > > Martijn
>> >> > > > > > >> > >
>> >> > > > > > >> > > On Tue, 3 May 2022 at 10:15, Александр Смирнов <
>> >> > > > > smirale...@gmail.com>
>> >> > > > > > >> > > wrote:
>> >> > > > > > >> > >
>> >> > > > > > >> > > > Hi Qingsheng, Leonard and Jark,
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > Thanks for your detailed feedback! However, I have
>> >> questions
>> >> > > > > about
>> >> > > > > > >> > > > some of your statements (maybe I didn't get 
>> >> > > > > > >> > > > something?).
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > > Caching actually breaks the semantic of "FOR
>> >> SYSTEM_TIME
>> >> > > AS OF
>> >> > > > > > >> > > proc_time”
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > I agree that the semantics of "FOR SYSTEM_TIME AS OF
>> >> > > proc_time"
>> >> > > > > is
>> >> > > > > > >> not
>> >> > > > > > >> > > > fully implemented with caching, but as you said, users
>> >> go
>> >> > > on it
>> >> > > > > > >> > > > consciously to achieve better performance (no one
>> >> proposed
>> >> > > to
>> >> > > > > enable
>> >> > > > > > >> > > > caching by default, etc.). Or by users do you mean 
>> >> > > > > > >> > > > other
>> >> > > > > developers
>> >> > > > > > >> of
>> >> > > > > > >> > > > connectors? In this case developers explicitly specify
>> >> > > whether
>> >> > > > > their
>> >> > > > > > >> > > > connector supports caching or not (in the list of
>> >> supported
>> >> > > > > > >> options),
>> >> > > > > > >> > > > no one makes them do that if they don't want to. So 
>> >> > > > > > >> > > > what
>> >> > > > > exactly is
>> >> > > > > > >> > > > the difference between implementing caching in modules
>> >> > > > > > >> > > > flink-table-runtime and in flink-table-common from the
>> >> > > > > considered
>> >> > > > > > >> > > > point of view? How does it affect on
>> >> breaking/non-breaking
>> >> > > the
>> >> > > > > > >> > > > semantics of "FOR SYSTEM_TIME AS OF proc_time"?
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > > confront a situation that allows table options in DDL
>> >> to
>> >> > > > > control
>> >> > > > > > >> the
>> >> > > > > > >> > > > behavior of the framework, which has never happened
>> >> > > previously
>> >> > > > > and
>> >> > > > > > >> > should
>> >> > > > > > >> > > > be cautious
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > If we talk about main differences of semantics of DDL
>> >> > > options
>> >> > > > > and
>> >> > > > > > >> > > > config options("table.exec.xxx"), isn't it about
>> >> limiting
>> >> > > the
>> >> > > > > scope
>> >> > > > > > >> of
>> >> > > > > > >> > > > the options + importance for the user business logic
>> >> rather
>> >> > > than
>> >> > > > > > >> > > > specific location of corresponding logic in the
>> >> framework? I
>> >> > > > > mean
>> >> > > > > > >> that
>> >> > > > > > >> > > > in my design, for example, putting an option with 
>> >> > > > > > >> > > > lookup
>> >> > > cache
>> >> > > > > > >> > > > strategy in configurations would  be the wrong 
>> >> > > > > > >> > > > decision,
>> >> > > > > because it
>> >> > > > > > >> > > > directly affects the user's business logic (not just
>> >> > > performance
>> >> > > > > > >> > > > optimization) + touches just several functions of ONE
>> >> table
>> >> > > > > (there
>> >> > > > > > >> can
>> >> > > > > > >> > > > be multiple tables with different caches). Does it
>> >> really
>> >> > > > > matter for
>> >> > > > > > >> > > > the user (or someone else) where the logic is located,
>> >> > > which is
>> >> > > > > > >> > > > affected by the applied option?
>> >> > > > > > >> > > > Also I can remember DDL option 'sink.parallelism',
>> >> which in
>> >> > > > > some way
>> >> > > > > > >> > > > "controls the behavior of the framework" and I don't
>> >> see any
>> >> > > > > problem
>> >> > > > > > >> > > > here.
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > > introduce a new interface for this all-caching
>> >> scenario
>> >> > > and
>> >> > > > > the
>> >> > > > > > >> > design
>> >> > > > > > >> > > > would become more complex
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > This is a subject for a separate discussion, but
>> >> actually
>> >> > > in our
>> >> > > > > > >> > > > internal version we solved this problem quite easily -
>> >> we
>> >> > > reused
>> >> > > > > > >> > > > InputFormat class (so there is no need for a new API).
>> >> The
>> >> > > > > point is
>> >> > > > > > >> > > > that currently all lookup connectors use InputFormat 
>> >> > > > > > >> > > > for
>> >> > > > > scanning
>> >> > > > > > >> the
>> >> > > > > > >> > > > data in batch mode: HBase, JDBC and even Hive - it uses
>> >> > > class
>> >> > > > > > >> > > > PartitionReader, that is actually just a wrapper around
>> >> > > > > InputFormat.
>> >> > > > > > >> > > > The advantage of this solution is the ability to reload
>> >> > > cache
>> >> > > > > data
>> >> > > > > > >> in
>> >> > > > > > >> > > > parallel (number of threads depends on number of
>> >> > > InputSplits,
>> >> > > > > but
>> >> > > > > > >> has
>> >> > > > > > >> > > > an upper limit). As a result cache reload time
>> >> significantly
>> >> > > > > reduces
>> >> > > > > > >> > > > (as well as time of input stream blocking). I know that
>> >> > > usually
>> >> > > > > we
>> >> > > > > > >> try
>> >> > > > > > >> > > > to avoid usage of concurrency in Flink code, but maybe
>> >> this
>> >> > > one
>> >> > > > > can
>> >> > > > > > >> be
>> >> > > > > > >> > > > an exception. BTW I don't say that it's an ideal
>> >> solution,
>> >> > > maybe
>> >> > > > > > >> there
>> >> > > > > > >> > > > are better ones.
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > > Providing the cache in the framework might introduce
>> >> > > > > compatibility
>> >> > > > > > >> > > issues
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > It's possible only in cases when the developer of the
>> >> > > connector
>> >> > > > > > >> won't
>> >> > > > > > >> > > > properly refactor his code and will use new cache
>> >> options
>> >> > > > > > >> incorrectly
>> >> > > > > > >> > > > (i.e. explicitly provide the same options into 2
>> >> different
>> >> > > code
>> >> > > > > > >> > > > places). For correct behavior all he will need to do is
>> >> to
>> >> > > > > redirect
>> >> > > > > > >> > > > existing options to the framework's LookupConfig (+
>> >> maybe
>> >> > > add an
>> >> > > > > > >> alias
>> >> > > > > > >> > > > for options, if there was different naming), everything
>> >> > > will be
>> >> > > > > > >> > > > transparent for users. If the developer won't do
>> >> > > refactoring at
>> >> > > > > all,
>> >> > > > > > >> > > > nothing will be changed for the connector because of
>> >> > > backward
>> >> > > > > > >> > > > compatibility. Also if a developer wants to use his own
>> >> > > cache
>> >> > > > > logic,
>> >> > > > > > >> > > > he just can refuse to pass some of the configs into the
>> >> > > > > framework,
>> >> > > > > > >> and
>> >> > > > > > >> > > > instead make his own implementation with already
>> >> existing
>> >> > > > > configs
>> >> > > > > > >> and
>> >> > > > > > >> > > > metrics (but actually I think that it's a rare case).
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > > filters and projections should be pushed all the way
>> >> down
>> >> > > to
>> >> > > > > the
>> >> > > > > > >> > table
>> >> > > > > > >> > > > function, like what we do in the scan source
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > It's the great purpose. But the truth is that the ONLY
>> >> > > connector
>> >> > > > > > >> that
>> >> > > > > > >> > > > supports filter pushdown is FileSystemTableSource
>> >> > > > > > >> > > > (no database connector supports it currently). Also for
>> >> some
>> >> > > > > > >> databases
>> >> > > > > > >> > > > it's simply impossible to pushdown such complex filters
>> >> > > that we
>> >> > > > > have
>> >> > > > > > >> > > > in Flink.
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > >  only applying these optimizations to the cache seems
>> >> not
>> >> > > > > quite
>> >> > > > > > >> > useful
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > Filters can cut off an arbitrarily large amount of data
>> >> > > from the
>> >> > > > > > >> > > > dimension table. For a simple example, suppose in
>> >> dimension
>> >> > > > > table
>> >> > > > > > >> > > > 'users'
>> >> > > > > > >> > > > we have column 'age' with values from 20 to 40, and
>> >> input
>> >> > > stream
>> >> > > > > > >> > > > 'clicks' that is ~uniformly distributed by age of
>> >> users. If
>> >> > > we
>> >> > > > > have
>> >> > > > > > >> > > > filter 'age > 30',
>> >> > > > > > >> > > > there will be twice less data in cache. This means the
>> >> user
>> >> > > can
>> >> > > > > > >> > > > increase 'lookup.cache.max-rows' by almost 2 times. It
>> >> will
>> >> > > > > gain a
>> >> > > > > > >> > > > huge
>> >> > > > > > >> > > > performance boost. Moreover, this optimization starts 
>> >> > > > > > >> > > > to
>> >> > > really
>> >> > > > > > >> shine
>> >> > > > > > >> > > > in 'ALL' cache, where tables without filters and
>> >> projections
>> >> > > > > can't
>> >> > > > > > >> fit
>> >> > > > > > >> > > > in memory, but with them - can. This opens up 
>> >> > > > > > >> > > > additional
>> >> > > > > > >> possibilities
>> >> > > > > > >> > > > for users. And this doesn't sound as 'not quite 
>> >> > > > > > >> > > > useful'.
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > It would be great to hear other voices regarding this
>> >> topic!
>> >> > > > > Because
>> >> > > > > > >> > > > we have quite a lot of controversial points, and I 
>> >> > > > > > >> > > > think
>> >> > > with
>> >> > > > > the
>> >> > > > > > >> help
>> >> > > > > > >> > > > of others it will be easier for us to come to a
>> >> consensus.
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > Best regards,
>> >> > > > > > >> > > > Smirnov Alexander
>> >> > > > > > >> > > >
>> >> > > > > > >> > > >
>> >> > > > > > >> > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
>> >> > > renqs...@gmail.com
>> >> > > > > >:
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > Hi Alexander and Arvid,
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > Thanks for the discussion and sorry for my late
>> >> response!
>> >> > > We
>> >> > > > > had
>> >> > > > > > >> an
>> >> > > > > > >> > > > internal discussion together with Jark and Leonard and
>> >> I’d
>> >> > > like
>> >> > > > > to
>> >> > > > > > >> > > > summarize our ideas. Instead of implementing the cache
>> >> > > logic in
>> >> > > > > the
>> >> > > > > > >> > table
>> >> > > > > > >> > > > runtime layer or wrapping around the user-provided 
>> >> > > > > > >> > > > table
>> >> > > > > function,
>> >> > > > > > >> we
>> >> > > > > > >> > > > prefer to introduce some new APIs extending
>> >> TableFunction
>> >> > > with
>> >> > > > > these
>> >> > > > > > >> > > > concerns:
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > 1. Caching actually breaks the semantic of "FOR
>> >> > > SYSTEM_TIME
>> >> > > > > AS OF
>> >> > > > > > >> > > > proc_time”, because it couldn’t truly reflect the
>> >> content
>> >> > > of the
>> >> > > > > > >> lookup
>> >> > > > > > >> > > > table at the moment of querying. If users choose to
>> >> enable
>> >> > > > > caching
>> >> > > > > > >> on
>> >> > > > > > >> > the
>> >> > > > > > >> > > > lookup table, they implicitly indicate that this
>> >> breakage is
>> >> > > > > > >> acceptable
>> >> > > > > > >> > > in
>> >> > > > > > >> > > > exchange for the performance. So we prefer not to
>> >> provide
>> >> > > > > caching on
>> >> > > > > > >> > the
>> >> > > > > > >> > > > table runtime level.
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > 2. If we make the cache implementation in the
>> >> framework
>> >> > > > > (whether
>> >> > > > > > >> in a
>> >> > > > > > >> > > > runner or a wrapper around TableFunction), we have to
>> >> > > confront a
>> >> > > > > > >> > > situation
>> >> > > > > > >> > > > that allows table options in DDL to control the
>> >> behavior of
>> >> > > the
>> >> > > > > > >> > > framework,
>> >> > > > > > >> > > > which has never happened previously and should be
>> >> cautious.
>> >> > > > > Under
>> >> > > > > > >> the
>> >> > > > > > >> > > > current design the behavior of the framework should
>> >> only be
>> >> > > > > > >> specified
>> >> > > > > > >> > by
>> >> > > > > > >> > > > configurations (“table.exec.xxx”), and it’s hard to
>> >> apply
>> >> > > these
>> >> > > > > > >> general
>> >> > > > > > >> > > > configs to a specific table.
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > 3. We have use cases that lookup source loads and
>> >> refresh
>> >> > > all
>> >> > > > > > >> records
>> >> > > > > > >> > > > periodically into the memory to achieve high lookup
>> >> > > performance
>> >> > > > > > >> (like
>> >> > > > > > >> > > Hive
>> >> > > > > > >> > > > connector in the community, and also widely used by our
>> >> > > internal
>> >> > > > > > >> > > > connectors). Wrapping the cache around the user’s
>> >> > > TableFunction
>> >> > > > > > >> works
>> >> > > > > > >> > > fine
>> >> > > > > > >> > > > for LRU caches, but I think we have to introduce a new
>> >> > > > > interface for
>> >> > > > > > >> > this
>> >> > > > > > >> > > > all-caching scenario and the design would become more
>> >> > > complex.
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > 4. Providing the cache in the framework might
>> >> introduce
>> >> > > > > > >> compatibility
>> >> > > > > > >> > > > issues to existing lookup sources like there might
>> >> exist two
>> >> > > > > caches
>> >> > > > > > >> > with
>> >> > > > > > >> > > > totally different strategies if the user incorrectly
>> >> > > configures
>> >> > > > > the
>> >> > > > > > >> > table
>> >> > > > > > >> > > > (one in the framework and another implemented by the
>> >> lookup
>> >> > > > > source).
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > As for the optimization mentioned by Alexander, I
>> >> think
>> >> > > > > filters
>> >> > > > > > >> and
>> >> > > > > > >> > > > projections should be pushed all the way down to the
>> >> table
>> >> > > > > function,
>> >> > > > > > >> > like
>> >> > > > > > >> > > > what we do in the scan source, instead of the runner
>> >> with
>> >> > > the
>> >> > > > > cache.
>> >> > > > > > >> > The
>> >> > > > > > >> > > > goal of using cache is to reduce the network I/O and
>> >> > > pressure
>> >> > > > > on the
>> >> > > > > > >> > > > external system, and only applying these optimizations
>> >> to
>> >> > > the
>> >> > > > > cache
>> >> > > > > > >> > seems
>> >> > > > > > >> > > > not quite useful.
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > I made some updates to the FLIP[1] to reflect our
>> >> ideas.
>> >> > > We
>> >> > > > > > >> prefer to
>> >> > > > > > >> > > > keep the cache implementation as a part of
>> >> TableFunction,
>> >> > > and we
>> >> > > > > > >> could
>> >> > > > > > >> > > > provide some helper classes (CachingTableFunction,
>> >> > > > > > >> > > AllCachingTableFunction,
>> >> > > > > > >> > > > CachingAsyncTableFunction) to developers and regulate
>> >> > > metrics
>> >> > > > > of the
>> >> > > > > > >> > > cache.
>> >> > > > > > >> > > > Also, I made a POC[2] for your reference.
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > Looking forward to your ideas!
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > [1]
>> >> > > > > > >> > > >
>> >> > > > > > >> > >
>> >> > > > > > >> >
>> >> > > > > > >>
>> >> > > > >
>> >> > >
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> >> > > > > > >> > > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > Best regards,
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > Qingsheng
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <
>> >> > > > > > >> > > smirale...@gmail.com>
>> >> > > > > > >> > > > wrote:
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> Thanks for the response, Arvid!
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> I have few comments on your message.
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> > but could also live with an easier solution as the
>> >> > > first
>> >> > > > > step:
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> I think that these 2 ways are mutually exclusive
>> >> > > (originally
>> >> > > > > > >> > proposed
>> >> > > > > > >> > > > >> by Qingsheng and mine), because conceptually they
>> >> follow
>> >> > > the
>> >> > > > > same
>> >> > > > > > >> > > > >> goal, but implementation details are different. If 
>> >> > > > > > >> > > > >> we
>> >> > > will
>> >> > > > > go one
>> >> > > > > > >> > way,
>> >> > > > > > >> > > > >> moving to another way in the future will mean
>> >> deleting
>> >> > > > > existing
>> >> > > > > > >> code
>> >> > > > > > >> > > > >> and once again changing the API for connectors. So I
>> >> > > think we
>> >> > > > > > >> should
>> >> > > > > > >> > > > >> reach a consensus with the community about that and
>> >> then
>> >> > > work
>> >> > > > > > >> > together
>> >> > > > > > >> > > > >> on this FLIP, i.e. divide the work on tasks for
>> >> different
>> >> > > > > parts
>> >> > > > > > >> of
>> >> > > > > > >> > the
>> >> > > > > > >> > > > >> flip (for example, LRU cache unification /
>> >> introducing
>> >> > > > > proposed
>> >> > > > > > >> set
>> >> > > > > > >> > of
>> >> > > > > > >> > > > >> metrics / further work…). WDYT, Qingsheng?
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> > as the source will only receive the requests after
>> >> > > filter
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> Actually if filters are applied to fields of the
>> >> lookup
>> >> > > > > table, we
>> >> > > > > > >> > > > >> firstly must do requests, and only after that we can
>> >> > > filter
>> >> > > > > > >> > responses,
>> >> > > > > > >> > > > >> because lookup connectors don't have filter
>> >> pushdown. So
>> >> > > if
>> >> > > > > > >> > filtering
>> >> > > > > > >> > > > >> is done before caching, there will be much less rows
>> >> in
>> >> > > > > cache.
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> > @Alexander unfortunately, your architecture is not
>> >> > > shared.
>> >> > > > > I
>> >> > > > > > >> don't
>> >> > > > > > >> > > > know the
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> > solution to share images to be honest.
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> Sorry for that, I’m a bit new to such kinds of
>> >> > > conversations
>> >> > > > > :)
>> >> > > > > > >> > > > >> I have no write access to the confluence, so I made 
>> >> > > > > > >> > > > >> a
>> >> > > Jira
>> >> > > > > issue,
>> >> > > > > > >> > > > >> where described the proposed changes in more details
>> >> -
>> >> > > > > > >> > > > >> https://issues.apache.org/jira/browse/FLINK-27411.
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> Will happy to get more feedback!
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> Best,
>> >> > > > > > >> > > > >> Smirnov Alexander
>> >> > > > > > >> > > > >>
>> >> > > > > > >> > > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
>> >> > > ar...@apache.org>:
>> >> > > > > > >> > > > >> >
>> >> > > > > > >> > > > >> > Hi Qingsheng,
>> >> > > > > > >> > > > >> >
>> >> > > > > > >> > > > >> > Thanks for driving this; the inconsistency was not
>> >> > > > > satisfying
>> >> > > > > > >> for
>> >> > > > > > >> > > me.
>> >> > > > > > >> > > > >> >
>> >> > > > > > >> > > > >> > I second Alexander's idea though but could also
>> >> live
>> >> > > with
>> >> > > > > an
>> >> > > > > > >> > easier
>> >> > > > > > >> > > > >> > solution as the first step: Instead of making
>> >> caching
>> >> > > an
>> >> > > > > > >> > > > implementation
>> >> > > > > > >> > > > >> > detail of TableFunction X, rather devise a caching
>> >> > > layer
>> >> > > > > > >> around X.
>> >> > > > > > >> > > So
>> >> > > > > > >> > > > the
>> >> > > > > > >> > > > >> > proposal would be a CachingTableFunction that
>> >> > > delegates to
>> >> > > > > X in
>> >> > > > > > >> > case
>> >> > > > > > >> > > > of
>> >> > > > > > >> > > > >> > misses and else manages the cache. Lifting it into
>> >> the
>> >> > > > > operator
>> >> > > > > > >> > > model
>> >> > > > > > >> > > > as
>> >> > > > > > >> > > > >> > proposed would be even better but is probably
>> >> > > unnecessary
>> >> > > > > in
>> >> > > > > > >> the
>> >> > > > > > >> > > > first step
>> >> > > > > > >> > > > >> > for a lookup source (as the source will only
>> >> receive
>> >> > > the
>> >> > > > > > >> requests
>> >> > > > > > >> > > > after
>> >> > > > > > >> > > > >> > filter; applying projection may be more
>> >> interesting to
>> >> > > save
>> >> > > > > > >> > memory).
>> >> > > > > > >> > > > >> >
>> >> > > > > > >> > > > >> > Another advantage is that all the changes of this
>> >> FLIP
>> >> > > > > would be
>> >> > > > > > >> > > > limited to
>> >> > > > > > >> > > > >> > options, no need for new public interfaces.
>> >> Everything
>> >> > > else
>> >> > > > > > >> > remains
>> >> > > > > > >> > > an
>> >> > > > > > >> > > > >> > implementation of Table runtime. That means we can
>> >> > > easily
>> >> > > > > > >> > > incorporate
>> >> > > > > > >> > > > the
>> >> > > > > > >> > > > >> > optimization potential that Alexander pointed out
>> >> > > later.
>> >> > > > > > >> > > > >> >
>> >> > > > > > >> > > > >> > @Alexander unfortunately, your architecture is not
>> >> > > shared.
>> >> > > > > I
>> >> > > > > > >> don't
>> >> > > > > > >> > > > know the
>> >> > > > > > >> > > > >> > solution to share images to be honest.
>> >> > > > > > >> > > > >> >
>> >> > > > > > >> > > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов 
>> >> > > > > > >> > > > >> > <
>> >> > > > > > >> > > > smirale...@gmail.com>
>> >> > > > > > >> > > > >> > wrote:
>> >> > > > > > >> > > > >> >
>> >> > > > > > >> > > > >> > > Hi Qingsheng! My name is Alexander, I'm not a
>> >> > > committer
>> >> > > > > yet,
>> >> > > > > > >> but
>> >> > > > > > >> > > I'd
>> >> > > > > > >> > > > >> > > really like to become one. And this FLIP really
>> >> > > > > interested
>> >> > > > > > >> me.
>> >> > > > > > >> > > > >> > > Actually I have worked on a similar feature in 
>> >> > > > > > >> > > > >> > > my
>> >> > > > > company’s
>> >> > > > > > >> > Flink
>> >> > > > > > >> > > > >> > > fork, and we would like to share our thoughts on
>> >> > > this and
>> >> > > > > > >> make
>> >> > > > > > >> > > code
>> >> > > > > > >> > > > >> > > open source.
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > I think there is a better alternative than
>> >> > > introducing an
>> >> > > > > > >> > abstract
>> >> > > > > > >> > > > >> > > class for TableFunction (CachingTableFunction).
>> >> As
>> >> > > you
>> >> > > > > know,
>> >> > > > > > >> > > > >> > > TableFunction exists in the flink-table-common
>> >> > > module,
>> >> > > > > which
>> >> > > > > > >> > > > provides
>> >> > > > > > >> > > > >> > > only an API for working with tables – it’s very
>> >> > > > > convenient
>> >> > > > > > >> for
>> >> > > > > > >> > > > importing
>> >> > > > > > >> > > > >> > > in connectors. In turn, CachingTableFunction
>> >> contains
>> >> > > > > logic
>> >> > > > > > >> for
>> >> > > > > > >> > > > >> > > runtime execution,  so this class and everything
>> >> > > > > connected
>> >> > > > > > >> with
>> >> > > > > > >> > it
>> >> > > > > > >> > > > >> > > should be located in another module, probably in
>> >> > > > > > >> > > > flink-table-runtime.
>> >> > > > > > >> > > > >> > > But this will require connectors to depend on
>> >> another
>> >> > > > > module,
>> >> > > > > > >> > > which
>> >> > > > > > >> > > > >> > > contains a lot of runtime logic, which doesn’t
>> >> sound
>> >> > > > > good.
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > I suggest adding a new method ‘getLookupConfig’
>> >> to
>> >> > > > > > >> > > LookupTableSource
>> >> > > > > > >> > > > >> > > or LookupRuntimeProvider to allow connectors to
>> >> only
>> >> > > pass
>> >> > > > > > >> > > > >> > > configurations to the planner, therefore they
>> >> won’t
>> >> > > > > depend on
>> >> > > > > > >> > > > runtime
>> >> > > > > > >> > > > >> > > realization. Based on these configs planner will
>> >> > > > > construct a
>> >> > > > > > >> > > lookup
>> >> > > > > > >> > > > >> > > join operator with corresponding runtime logic
>> >> > > > > > >> (ProcessFunctions
>> >> > > > > > >> > > in
>> >> > > > > > >> > > > >> > > module flink-table-runtime). Architecture looks
>> >> like
>> >> > > in
>> >> > > > > the
>> >> > > > > > >> > pinned
>> >> > > > > > >> > > > >> > > image (LookupConfig class there is actually 
>> >> > > > > > >> > > > >> > > yours
>> >> > > > > > >> CacheConfig).
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > Classes in flink-table-planner, that will be
>> >> > > responsible
>> >> > > > > for
>> >> > > > > > >> > this
>> >> > > > > > >> > > –
>> >> > > > > > >> > > > >> > > CommonPhysicalLookupJoin and his inheritors.
>> >> > > > > > >> > > > >> > > Current classes for lookup join in
>> >> > > flink-table-runtime
>> >> > > > > -
>> >> > > > > > >> > > > >> > > LookupJoinRunner, AsyncLookupJoinRunner,
>> >> > > > > > >> > LookupJoinRunnerWithCalc,
>> >> > > > > > >> > > > >> > > AsyncLookupJoinRunnerWithCalc.
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > I suggest adding classes 
>> >> > > > > > >> > > > >> > > LookupJoinCachingRunner,
>> >> > > > > > >> > > > >> > > LookupJoinCachingRunnerWithCalc, etc.
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > And here comes another more powerful advantage 
>> >> > > > > > >> > > > >> > > of
>> >> > > such a
>> >> > > > > > >> > solution.
>> >> > > > > > >> > > > If
>> >> > > > > > >> > > > >> > > we have caching logic on a lower level, we can
>> >> apply
>> >> > > some
>> >> > > > > > >> > > > >> > > optimizations to it. LookupJoinRunnerWithCalc 
>> >> > > > > > >> > > > >> > > was
>> >> > > named
>> >> > > > > like
>> >> > > > > > >> > this
>> >> > > > > > >> > > > >> > > because it uses the ‘calc’ function, which
>> >> actually
>> >> > > > > mostly
>> >> > > > > > >> > > consists
>> >> > > > > > >> > > > of
>> >> > > > > > >> > > > >> > > filters and projections.
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > For example, in join table A with lookup table B
>> >> > > > > condition
>> >> > > > > > >> > ‘JOIN …
>> >> > > > > > >> > > > ON
>> >> > > > > > >> > > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE
>> >> B.salary >
>> >> > > 1000’
>> >> > > > > > >> > ‘calc’
>> >> > > > > > >> > > > >> > > function will contain filters A.age = B.age + 10
>> >> and
>> >> > > > > > >> B.salary >
>> >> > > > > > >> > > > 1000.
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > If we apply this function before storing records
>> >> in
>> >> > > > > cache,
>> >> > > > > > >> size
>> >> > > > > > >> > of
>> >> > > > > > >> > > > >> > > cache will be significantly reduced: filters =
>> >> avoid
>> >> > > > > storing
>> >> > > > > > >> > > useless
>> >> > > > > > >> > > > >> > > records in cache, projections = reduce records’
>> >> > > size. So
>> >> > > > > the
>> >> > > > > > >> > > initial
>> >> > > > > > >> > > > >> > > max number of records in cache can be increased
>> >> by
>> >> > > the
>> >> > > > > user.
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > What do you think about it?
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote:
>> >> > > > > > >> > > > >> > > > Hi devs,
>> >> > > > > > >> > > > >> > > >
>> >> > > > > > >> > > > >> > > > Yuan and I would like to start a discussion
>> >> about
>> >> > > > > > >> FLIP-221[1],
>> >> > > > > > >> > > > which
>> >> > > > > > >> > > > >> > > introduces an abstraction of lookup table cache
>> >> and
>> >> > > its
>> >> > > > > > >> standard
>> >> > > > > > >> > > > metrics.
>> >> > > > > > >> > > > >> > > >
>> >> > > > > > >> > > > >> > > > Currently each lookup table source should
>> >> implement
>> >> > > > > their
>> >> > > > > > >> own
>> >> > > > > > >> > > > cache to
>> >> > > > > > >> > > > >> > > store lookup results, and there isn’t a standard
>> >> of
>> >> > > > > metrics
>> >> > > > > > >> for
>> >> > > > > > >> > > > users and
>> >> > > > > > >> > > > >> > > developers to tuning their jobs with lookup
>> >> joins,
>> >> > > which
>> >> > > > > is a
>> >> > > > > > >> > > quite
>> >> > > > > > >> > > > common
>> >> > > > > > >> > > > >> > > use case in Flink table / SQL.
>> >> > > > > > >> > > > >> > > >
>> >> > > > > > >> > > > >> > > > Therefore we propose some new APIs including
>> >> cache,
>> >> > > > > > >> metrics,
>> >> > > > > > >> > > > wrapper
>> >> > > > > > >> > > > >> > > classes of TableFunction and new table options.
>> >> > > Please
>> >> > > > > take a
>> >> > > > > > >> > look
>> >> > > > > > >> > > > at the
>> >> > > > > > >> > > > >> > > FLIP page [1] to get more details. Any
>> >> suggestions
>> >> > > and
>> >> > > > > > >> comments
>> >> > > > > > >> > > > would be
>> >> > > > > > >> > > > >> > > appreciated!
>> >> > > > > > >> > > > >> > > >
>> >> > > > > > >> > > > >> > > > [1]
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > >
>> >> > > > > > >> > >
>> >> > > > > > >> >
>> >> > > > > > >>
>> >> > > > >
>> >> > >
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
>> >> > > > > > >> > > > >> > > >
>> >> > > > > > >> > > > >> > > > Best regards,
>> >> > > > > > >> > > > >> > > >
>> >> > > > > > >> > > > >> > > > Qingsheng
>> >> > > > > > >> > > > >> > > >
>> >> > > > > > >> > > > >> > > >
>> >> > > > > > >> > > > >> > >
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > --
>> >> > > > > > >> > > > > Best Regards,
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > Qingsheng Ren
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > Real-time Computing Team
>> >> > > > > > >> > > > > Alibaba Cloud
>> >> > > > > > >> > > > >
>> >> > > > > > >> > > > > Email: renqs...@gmail.com
>> >> > > > > > >> > > >
>> >> > > > > > >> > >
>> >> > > > > > >> >
>> >> > > > > > >>
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > > > --
>> >> > > > > > > Best regards,
>> >> > > > > > > Roman Boyko
>> >> > > > > > > e.: ro.v.bo...@gmail.com
>> >> > > > > > >
>> >> > > > >
>> >> > >
>> >> > >
>> >>
>> >>
>> >
>> >-- 
>> >Best Regards,
>> >
>> >*Qingsheng Ren*
>> >
>> >Real-time Computing Team
>> >Alibaba Cloud
>> >
>> >Email: renqs...@gmail.com
>> 

Reply via email to