Hi Jark!

Sorry for the late response. I would like to make some comments and
clarify my points.

1) I agree with your first statement. I think we can achieve both
advantages this way: put the Cache interface in flink-table-common,
but have implementations of it in flink-table-runtime. Therefore if a
connector developer wants to use existing cache strategies and their
implementations, he can just pass lookupConfig to the planner, but if
he wants to have its own cache implementation in his TableFunction, it
will be possible for him to use the existing interface for this
purpose (we can explicitly point this out in the documentation). In
this way all configs and metrics will be unified. WDYT?

> If a filter can prune 90% of data in the cache, we will have 90% of lookup 
> requests that can never be cached

2) Let me clarify the logic filters optimization in case of LRU cache.
It looks like Cache<RowData, Collection<RowData>>. Here we always
store the response of the dimension table in cache, even after
applying calc function. I.e. if there are no rows after applying
filters to the result of the 'eval' method of TableFunction, we store
the empty list by lookup keys. Therefore the cache line will be
filled, but will require much less memory (in bytes). I.e. we don't
completely filter keys, by which result was pruned, but significantly
reduce required memory to store this result. If the user knows about
this behavior, he can increase the 'max-rows' option before the start
of the job. But actually I came up with the idea that we can do this
automatically by using the 'maximumWeight' and 'weigher' methods of
GuavaCache [1]. Weight can be the size of the collection of rows
(value of cache). Therefore cache can automatically fit much more
records than before.

> Flink SQL has provided a standard way to do filters and projects pushdown, 
> i.e., SupportsFilterPushDown and SupportsProjectionPushDown.
> Jdbc/hive/HBase haven't implemented the interfaces, don't mean it's hard to 
> implement.

It's debatable how difficult it will be to implement filter pushdown.
But I think the fact that currently there is no database connector
with filter pushdown at least means that this feature won't be
supported soon in connectors. Moreover, if we talk about other
connectors (not in Flink repo), their databases might not support all
Flink filters (or not support filters at all). I think users are
interested in supporting cache filters optimization  independently of
supporting other features and solving more complex problems (or
unsolvable at all).

3) I agree with your third statement. Actually in our internal version
I also tried to unify the logic of scanning and reloading data from
connectors. But unfortunately, I didn't find a way to unify the logic
of all ScanRuntimeProviders (InputFormat, SourceFunction, Source,...)
and reuse it in reloading ALL cache. As a result I settled on using
InputFormat, because it was used for scanning in all lookup
connectors. (I didn't know that there are plans to deprecate
InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27 source
in ALL caching is not good idea, because this source was designed to
work in distributed environment (SplitEnumerator on JobManager and
SourceReaders on TaskManagers), not in one operator (lookup join
operator in our case). There is even no direct way to pass splits from
SplitEnumerator to SourceReader (this logic works through
SplitEnumeratorContext, which requires
OperatorCoordinator.SubtaskGateway to send AddSplitEvents). Usage of
InputFormat for ALL cache seems much more clearer and easier. But if
there are plans to refactor all connectors to FLIP-27, I have the
following ideas: maybe we can refuse from lookup join ALL cache in
favor of simple join with multiple scanning of batch source? The point
is that the only difference between lookup join ALL cache and simple
join with batch source is that in the first case scanning is performed
multiple times, in between which state (cache) is cleared (correct me
if I'm wrong). So what if we extend the functionality of simple join
to support state reloading + extend the functionality of scanning
batch source multiple times (this one should be easy with new FLIP-27
source, that unifies streaming/batch reading - we will need to change
only SplitEnumerator, which will pass splits again after some TTL).
WDYT? I must say that this looks like a long-term goal and will make
the scope of this FLIP even larger than you said. Maybe we can limit
ourselves to a simpler solution now (InputFormats).

So to sum up, my points is like this:
1) There is a way to make both concise and flexible interfaces for
caching in lookup join.
2) Cache filters optimization is important both in LRU and ALL caches.
3) It is unclear when filter pushdown will be supported in Flink
connectors, some of the connectors might not have the opportunity to
support filter pushdown + as I know, currently filter pushdown works
only for scanning (not lookup). So cache filters + projections
optimization should be independent from other features.
4) ALL cache realization is a complex topic that involves multiple
aspects of how Flink is developing. Refusing from InputFormat in favor
of FLIP-27 Source will make ALL cache realization really complex and
not clear, so maybe instead of that we can extend the functionality of
simple join or not refuse from InputFormat in case of lookup join ALL
cache?

Best regards,
Smirnov Alexander







[1] 
https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)

чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
>
> It's great to see the active discussion! I want to share my ideas:
>
> 1) implement the cache in framework vs. connectors base
> I don't have a strong opinion on this. Both ways should work (e.g., cache
> pruning, compatibility).
> The framework way can provide more concise interfaces.
> The connector base way can define more flexible cache
> strategies/implementations.
> We are still investigating a way to see if we can have both advantages.
> We should reach a consensus that the way should be a final state, and we
> are on the path to it.
>
> 2) filters and projections pushdown:
> I agree with Alex that the filter pushdown into cache can benefit a lot for
> ALL cache.
> However, this is not true for LRU cache. Connectors use cache to reduce IO
> requests to databases for better throughput.
> If a filter can prune 90% of data in the cache, we will have 90% of lookup
> requests that can never be cached
> and hit directly to the databases. That means the cache is meaningless in
> this case.
>
> IMO, Flink SQL has provided a standard way to do filters and projects
> pushdown, i.e., SupportsFilterPushDown and SupportsProjectionPushDown.
> Jdbc/hive/HBase haven't implemented the interfaces, don't mean it's hard to
> implement.
> They should implement the pushdown interfaces to reduce IO and the cache
> size.
> That should be a final state that the scan source and lookup source share
> the exact pushdown implementation.
> I don't see why we need to duplicate the pushdown logic in caches, which
> will complex the lookup join design.
>
> 3) ALL cache abstraction
> All cache might be the most challenging part of this FLIP. We have never
> provided a reload-lookup public interface.
> Currently, we put the reload logic in the "eval" method of TableFunction.
> That's hard for some sources (e.g., Hive).
> Ideally, connector implementation should share the logic of reload and
> scan, i.e. ScanTableSource with InputFormat/SourceFunction/FLIP-27 Source.
> However, InputFormat/SourceFunction are deprecated, and the FLIP-27 source
> is deeply coupled with SourceOperator.
> If we want to invoke the FLIP-27 source in LookupJoin, this may make the
> scope of this FLIP much larger.
> We are still investigating how to abstract the ALL cache logic and reuse
> the existing source interfaces.
>
>
> Best,
> Jark
>
>
>
> On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com> wrote:
>
> > It's a much more complicated activity and lies out of the scope of this
> > improvement. Because such pushdowns should be done for all ScanTableSource
> > implementations (not only for Lookup ones).
> >
> > On Thu, 5 May 2022 at 19:02, Martijn Visser <martijnvis...@apache.org>
> > wrote:
> >
> >> Hi everyone,
> >>
> >> One question regarding "And Alexander correctly mentioned that filter
> >> pushdown still is not implemented for jdbc/hive/hbase." -> Would an
> >> alternative solution be to actually implement these filter pushdowns? I
> >> can
> >> imagine that there are many more benefits to doing that, outside of lookup
> >> caching and metrics.
> >>
> >> Best regards,
> >>
> >> Martijn Visser
> >> https://twitter.com/MartijnVisser82
> >> https://github.com/MartijnVisser
> >>
> >>
> >> On Thu, 5 May 2022 at 13:58, Roman Boyko <ro.v.bo...@gmail.com> wrote:
> >>
> >> > Hi everyone!
> >> >
> >> > Thanks for driving such a valuable improvement!
> >> >
> >> > I do think that single cache implementation would be a nice opportunity
> >> for
> >> > users. And it will break the "FOR SYSTEM_TIME AS OF proc_time" semantics
> >> > anyway - doesn't matter how it will be implemented.
> >> >
> >> > Putting myself in the user's shoes, I can say that:
> >> > 1) I would prefer to have the opportunity to cut off the cache size by
> >> > simply filtering unnecessary data. And the most handy way to do it is
> >> apply
> >> > it inside LookupRunners. It would be a bit harder to pass it through the
> >> > LookupJoin node to TableFunction. And Alexander correctly mentioned that
> >> > filter pushdown still is not implemented for jdbc/hive/hbase.
> >> > 2) The ability to set the different caching parameters for different
> >> tables
> >> > is quite important. So I would prefer to set it through DDL rather than
> >> > have similar ttla, strategy and other options for all lookup tables.
> >> > 3) Providing the cache into the framework really deprives us of
> >> > extensibility (users won't be able to implement their own cache). But
> >> most
> >> > probably it might be solved by creating more different cache strategies
> >> and
> >> > a wider set of configurations.
> >> >
> >> > All these points are much closer to the schema proposed by Alexander.
> >> > Qingshen Ren, please correct me if I'm not right and all these
> >> facilities
> >> > might be simply implemented in your architecture?
> >> >
> >> > Best regards,
> >> > Roman Boyko
> >> > e.: ro.v.bo...@gmail.com
> >> >
> >> > On Wed, 4 May 2022 at 21:01, Martijn Visser <martijnvis...@apache.org>
> >> > wrote:
> >> >
> >> > > Hi everyone,
> >> > >
> >> > > I don't have much to chip in, but just wanted to express that I really
> >> > > appreciate the in-depth discussion on this topic and I hope that
> >> others
> >> > > will join the conversation.
> >> > >
> >> > > Best regards,
> >> > >
> >> > > Martijn
> >> > >
> >> > > On Tue, 3 May 2022 at 10:15, Александр Смирнов <smirale...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Hi Qingsheng, Leonard and Jark,
> >> > > >
> >> > > > Thanks for your detailed feedback! However, I have questions about
> >> > > > some of your statements (maybe I didn't get something?).
> >> > > >
> >> > > > > Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
> >> > > proc_time”
> >> > > >
> >> > > > I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time" is
> >> not
> >> > > > fully implemented with caching, but as you said, users go on it
> >> > > > consciously to achieve better performance (no one proposed to enable
> >> > > > caching by default, etc.). Or by users do you mean other developers
> >> of
> >> > > > connectors? In this case developers explicitly specify whether their
> >> > > > connector supports caching or not (in the list of supported
> >> options),
> >> > > > no one makes them do that if they don't want to. So what exactly is
> >> > > > the difference between implementing caching in modules
> >> > > > flink-table-runtime and in flink-table-common from the considered
> >> > > > point of view? How does it affect on breaking/non-breaking the
> >> > > > semantics of "FOR SYSTEM_TIME AS OF proc_time"?
> >> > > >
> >> > > > > confront a situation that allows table options in DDL to control
> >> the
> >> > > > behavior of the framework, which has never happened previously and
> >> > should
> >> > > > be cautious
> >> > > >
> >> > > > If we talk about main differences of semantics of DDL options and
> >> > > > config options("table.exec.xxx"), isn't it about limiting the scope
> >> of
> >> > > > the options + importance for the user business logic rather than
> >> > > > specific location of corresponding logic in the framework? I mean
> >> that
> >> > > > in my design, for example, putting an option with lookup cache
> >> > > > strategy in configurations would  be the wrong decision, because it
> >> > > > directly affects the user's business logic (not just performance
> >> > > > optimization) + touches just several functions of ONE table (there
> >> can
> >> > > > be multiple tables with different caches). Does it really matter for
> >> > > > the user (or someone else) where the logic is located, which is
> >> > > > affected by the applied option?
> >> > > > Also I can remember DDL option 'sink.parallelism', which in some way
> >> > > > "controls the behavior of the framework" and I don't see any problem
> >> > > > here.
> >> > > >
> >> > > > > introduce a new interface for this all-caching scenario and the
> >> > design
> >> > > > would become more complex
> >> > > >
> >> > > > This is a subject for a separate discussion, but actually in our
> >> > > > internal version we solved this problem quite easily - we reused
> >> > > > InputFormat class (so there is no need for a new API). The point is
> >> > > > that currently all lookup connectors use InputFormat for scanning
> >> the
> >> > > > data in batch mode: HBase, JDBC and even Hive - it uses class
> >> > > > PartitionReader, that is actually just a wrapper around InputFormat.
> >> > > > The advantage of this solution is the ability to reload cache data
> >> in
> >> > > > parallel (number of threads depends on number of InputSplits, but
> >> has
> >> > > > an upper limit). As a result cache reload time significantly reduces
> >> > > > (as well as time of input stream blocking). I know that usually we
> >> try
> >> > > > to avoid usage of concurrency in Flink code, but maybe this one can
> >> be
> >> > > > an exception. BTW I don't say that it's an ideal solution, maybe
> >> there
> >> > > > are better ones.
> >> > > >
> >> > > > > Providing the cache in the framework might introduce compatibility
> >> > > issues
> >> > > >
> >> > > > It's possible only in cases when the developer of the connector
> >> won't
> >> > > > properly refactor his code and will use new cache options
> >> incorrectly
> >> > > > (i.e. explicitly provide the same options into 2 different code
> >> > > > places). For correct behavior all he will need to do is to redirect
> >> > > > existing options to the framework's LookupConfig (+ maybe add an
> >> alias
> >> > > > for options, if there was different naming), everything will be
> >> > > > transparent for users. If the developer won't do refactoring at all,
> >> > > > nothing will be changed for the connector because of backward
> >> > > > compatibility. Also if a developer wants to use his own cache logic,
> >> > > > he just can refuse to pass some of the configs into the framework,
> >> and
> >> > > > instead make his own implementation with already existing configs
> >> and
> >> > > > metrics (but actually I think that it's a rare case).
> >> > > >
> >> > > > > filters and projections should be pushed all the way down to the
> >> > table
> >> > > > function, like what we do in the scan source
> >> > > >
> >> > > > It's the great purpose. But the truth is that the ONLY connector
> >> that
> >> > > > supports filter pushdown is FileSystemTableSource
> >> > > > (no database connector supports it currently). Also for some
> >> databases
> >> > > > it's simply impossible to pushdown such complex filters that we have
> >> > > > in Flink.
> >> > > >
> >> > > > >  only applying these optimizations to the cache seems not quite
> >> > useful
> >> > > >
> >> > > > Filters can cut off an arbitrarily large amount of data from the
> >> > > > dimension table. For a simple example, suppose in dimension table
> >> > > > 'users'
> >> > > > we have column 'age' with values from 20 to 40, and input stream
> >> > > > 'clicks' that is ~uniformly distributed by age of users. If we have
> >> > > > filter 'age > 30',
> >> > > > there will be twice less data in cache. This means the user can
> >> > > > increase 'lookup.cache.max-rows' by almost 2 times. It will gain a
> >> > > > huge
> >> > > > performance boost. Moreover, this optimization starts to really
> >> shine
> >> > > > in 'ALL' cache, where tables without filters and projections can't
> >> fit
> >> > > > in memory, but with them - can. This opens up additional
> >> possibilities
> >> > > > for users. And this doesn't sound as 'not quite useful'.
> >> > > >
> >> > > > It would be great to hear other voices regarding this topic! Because
> >> > > > we have quite a lot of controversial points, and I think with the
> >> help
> >> > > > of others it will be easier for us to come to a consensus.
> >> > > >
> >> > > > Best regards,
> >> > > > Smirnov Alexander
> >> > > >
> >> > > >
> >> > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <renqs...@gmail.com>:
> >> > > > >
> >> > > > > Hi Alexander and Arvid,
> >> > > > >
> >> > > > > Thanks for the discussion and sorry for my late response! We had
> >> an
> >> > > > internal discussion together with Jark and Leonard and I’d like to
> >> > > > summarize our ideas. Instead of implementing the cache logic in the
> >> > table
> >> > > > runtime layer or wrapping around the user-provided table function,
> >> we
> >> > > > prefer to introduce some new APIs extending TableFunction with these
> >> > > > concerns:
> >> > > > >
> >> > > > > 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
> >> > > > proc_time”, because it couldn’t truly reflect the content of the
> >> lookup
> >> > > > table at the moment of querying. If users choose to enable caching
> >> on
> >> > the
> >> > > > lookup table, they implicitly indicate that this breakage is
> >> acceptable
> >> > > in
> >> > > > exchange for the performance. So we prefer not to provide caching on
> >> > the
> >> > > > table runtime level.
> >> > > > >
> >> > > > > 2. If we make the cache implementation in the framework (whether
> >> in a
> >> > > > runner or a wrapper around TableFunction), we have to confront a
> >> > > situation
> >> > > > that allows table options in DDL to control the behavior of the
> >> > > framework,
> >> > > > which has never happened previously and should be cautious. Under
> >> the
> >> > > > current design the behavior of the framework should only be
> >> specified
> >> > by
> >> > > > configurations (“table.exec.xxx”), and it’s hard to apply these
> >> general
> >> > > > configs to a specific table.
> >> > > > >
> >> > > > > 3. We have use cases that lookup source loads and refresh all
> >> records
> >> > > > periodically into the memory to achieve high lookup performance
> >> (like
> >> > > Hive
> >> > > > connector in the community, and also widely used by our internal
> >> > > > connectors). Wrapping the cache around the user’s TableFunction
> >> works
> >> > > fine
> >> > > > for LRU caches, but I think we have to introduce a new interface for
> >> > this
> >> > > > all-caching scenario and the design would become more complex.
> >> > > > >
> >> > > > > 4. Providing the cache in the framework might introduce
> >> compatibility
> >> > > > issues to existing lookup sources like there might exist two caches
> >> > with
> >> > > > totally different strategies if the user incorrectly configures the
> >> > table
> >> > > > (one in the framework and another implemented by the lookup source).
> >> > > > >
> >> > > > > As for the optimization mentioned by Alexander, I think filters
> >> and
> >> > > > projections should be pushed all the way down to the table function,
> >> > like
> >> > > > what we do in the scan source, instead of the runner with the cache.
> >> > The
> >> > > > goal of using cache is to reduce the network I/O and pressure on the
> >> > > > external system, and only applying these optimizations to the cache
> >> > seems
> >> > > > not quite useful.
> >> > > > >
> >> > > > > I made some updates to the FLIP[1] to reflect our ideas. We
> >> prefer to
> >> > > > keep the cache implementation as a part of TableFunction, and we
> >> could
> >> > > > provide some helper classes (CachingTableFunction,
> >> > > AllCachingTableFunction,
> >> > > > CachingAsyncTableFunction) to developers and regulate metrics of the
> >> > > cache.
> >> > > > Also, I made a POC[2] for your reference.
> >> > > > >
> >> > > > > Looking forward to your ideas!
> >> > > > >
> >> > > > > [1]
> >> > > >
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> > > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221
> >> > > > >
> >> > > > > Best regards,
> >> > > > >
> >> > > > > Qingsheng
> >> > > > >
> >> > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <
> >> > > smirale...@gmail.com>
> >> > > > wrote:
> >> > > > >>
> >> > > > >> Thanks for the response, Arvid!
> >> > > > >>
> >> > > > >> I have few comments on your message.
> >> > > > >>
> >> > > > >> > but could also live with an easier solution as the first step:
> >> > > > >>
> >> > > > >> I think that these 2 ways are mutually exclusive (originally
> >> > proposed
> >> > > > >> by Qingsheng and mine), because conceptually they follow the same
> >> > > > >> goal, but implementation details are different. If we will go one
> >> > way,
> >> > > > >> moving to another way in the future will mean deleting existing
> >> code
> >> > > > >> and once again changing the API for connectors. So I think we
> >> should
> >> > > > >> reach a consensus with the community about that and then work
> >> > together
> >> > > > >> on this FLIP, i.e. divide the work on tasks for different parts
> >> of
> >> > the
> >> > > > >> flip (for example, LRU cache unification / introducing proposed
> >> set
> >> > of
> >> > > > >> metrics / further work…). WDYT, Qingsheng?
> >> > > > >>
> >> > > > >> > as the source will only receive the requests after filter
> >> > > > >>
> >> > > > >> Actually if filters are applied to fields of the lookup table, we
> >> > > > >> firstly must do requests, and only after that we can filter
> >> > responses,
> >> > > > >> because lookup connectors don't have filter pushdown. So if
> >> > filtering
> >> > > > >> is done before caching, there will be much less rows in cache.
> >> > > > >>
> >> > > > >> > @Alexander unfortunately, your architecture is not shared. I
> >> don't
> >> > > > know the
> >> > > > >>
> >> > > > >> > solution to share images to be honest.
> >> > > > >>
> >> > > > >> Sorry for that, I’m a bit new to such kinds of conversations :)
> >> > > > >> I have no write access to the confluence, so I made a Jira issue,
> >> > > > >> where described the proposed changes in more details -
> >> > > > >> https://issues.apache.org/jira/browse/FLINK-27411.
> >> > > > >>
> >> > > > >> Will happy to get more feedback!
> >> > > > >>
> >> > > > >> Best,
> >> > > > >> Smirnov Alexander
> >> > > > >>
> >> > > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>:
> >> > > > >> >
> >> > > > >> > Hi Qingsheng,
> >> > > > >> >
> >> > > > >> > Thanks for driving this; the inconsistency was not satisfying
> >> for
> >> > > me.
> >> > > > >> >
> >> > > > >> > I second Alexander's idea though but could also live with an
> >> > easier
> >> > > > >> > solution as the first step: Instead of making caching an
> >> > > > implementation
> >> > > > >> > detail of TableFunction X, rather devise a caching layer
> >> around X.
> >> > > So
> >> > > > the
> >> > > > >> > proposal would be a CachingTableFunction that delegates to X in
> >> > case
> >> > > > of
> >> > > > >> > misses and else manages the cache. Lifting it into the operator
> >> > > model
> >> > > > as
> >> > > > >> > proposed would be even better but is probably unnecessary in
> >> the
> >> > > > first step
> >> > > > >> > for a lookup source (as the source will only receive the
> >> requests
> >> > > > after
> >> > > > >> > filter; applying projection may be more interesting to save
> >> > memory).
> >> > > > >> >
> >> > > > >> > Another advantage is that all the changes of this FLIP would be
> >> > > > limited to
> >> > > > >> > options, no need for new public interfaces. Everything else
> >> > remains
> >> > > an
> >> > > > >> > implementation of Table runtime. That means we can easily
> >> > > incorporate
> >> > > > the
> >> > > > >> > optimization potential that Alexander pointed out later.
> >> > > > >> >
> >> > > > >> > @Alexander unfortunately, your architecture is not shared. I
> >> don't
> >> > > > know the
> >> > > > >> > solution to share images to be honest.
> >> > > > >> >
> >> > > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <
> >> > > > smirale...@gmail.com>
> >> > > > >> > wrote:
> >> > > > >> >
> >> > > > >> > > Hi Qingsheng! My name is Alexander, I'm not a committer yet,
> >> but
> >> > > I'd
> >> > > > >> > > really like to become one. And this FLIP really interested
> >> me.
> >> > > > >> > > Actually I have worked on a similar feature in my company’s
> >> > Flink
> >> > > > >> > > fork, and we would like to share our thoughts on this and
> >> make
> >> > > code
> >> > > > >> > > open source.
> >> > > > >> > >
> >> > > > >> > > I think there is a better alternative than introducing an
> >> > abstract
> >> > > > >> > > class for TableFunction (CachingTableFunction). As you know,
> >> > > > >> > > TableFunction exists in the flink-table-common module, which
> >> > > > provides
> >> > > > >> > > only an API for working with tables – it’s very convenient
> >> for
> >> > > > importing
> >> > > > >> > > in connectors. In turn, CachingTableFunction contains logic
> >> for
> >> > > > >> > > runtime execution,  so this class and everything connected
> >> with
> >> > it
> >> > > > >> > > should be located in another module, probably in
> >> > > > flink-table-runtime.
> >> > > > >> > > But this will require connectors to depend on another module,
> >> > > which
> >> > > > >> > > contains a lot of runtime logic, which doesn’t sound good.
> >> > > > >> > >
> >> > > > >> > > I suggest adding a new method ‘getLookupConfig’ to
> >> > > LookupTableSource
> >> > > > >> > > or LookupRuntimeProvider to allow connectors to only pass
> >> > > > >> > > configurations to the planner, therefore they won’t depend on
> >> > > > runtime
> >> > > > >> > > realization. Based on these configs planner will construct a
> >> > > lookup
> >> > > > >> > > join operator with corresponding runtime logic
> >> (ProcessFunctions
> >> > > in
> >> > > > >> > > module flink-table-runtime). Architecture looks like in the
> >> > pinned
> >> > > > >> > > image (LookupConfig class there is actually yours
> >> CacheConfig).
> >> > > > >> > >
> >> > > > >> > > Classes in flink-table-planner, that will be responsible for
> >> > this
> >> > > –
> >> > > > >> > > CommonPhysicalLookupJoin and his inheritors.
> >> > > > >> > > Current classes for lookup join in  flink-table-runtime  -
> >> > > > >> > > LookupJoinRunner, AsyncLookupJoinRunner,
> >> > LookupJoinRunnerWithCalc,
> >> > > > >> > > AsyncLookupJoinRunnerWithCalc.
> >> > > > >> > >
> >> > > > >> > > I suggest adding classes LookupJoinCachingRunner,
> >> > > > >> > > LookupJoinCachingRunnerWithCalc, etc.
> >> > > > >> > >
> >> > > > >> > > And here comes another more powerful advantage of such a
> >> > solution.
> >> > > > If
> >> > > > >> > > we have caching logic on a lower level, we can apply some
> >> > > > >> > > optimizations to it. LookupJoinRunnerWithCalc was named like
> >> > this
> >> > > > >> > > because it uses the ‘calc’ function, which actually mostly
> >> > > consists
> >> > > > of
> >> > > > >> > > filters and projections.
> >> > > > >> > >
> >> > > > >> > > For example, in join table A with lookup table B condition
> >> > ‘JOIN …
> >> > > > ON
> >> > > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’
> >> > ‘calc’
> >> > > > >> > > function will contain filters A.age = B.age + 10 and
> >> B.salary >
> >> > > > 1000.
> >> > > > >> > >
> >> > > > >> > > If we apply this function before storing records in cache,
> >> size
> >> > of
> >> > > > >> > > cache will be significantly reduced: filters = avoid storing
> >> > > useless
> >> > > > >> > > records in cache, projections = reduce records’ size. So the
> >> > > initial
> >> > > > >> > > max number of records in cache can be increased by the user.
> >> > > > >> > >
> >> > > > >> > > What do you think about it?
> >> > > > >> > >
> >> > > > >> > >
> >> > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> >> > > > >> > > > Hi devs,
> >> > > > >> > > >
> >> > > > >> > > > Yuan and I would like to start a discussion about
> >> FLIP-221[1],
> >> > > > which
> >> > > > >> > > introduces an abstraction of lookup table cache and its
> >> standard
> >> > > > metrics.
> >> > > > >> > > >
> >> > > > >> > > > Currently each lookup table source should implement their
> >> own
> >> > > > cache to
> >> > > > >> > > store lookup results, and there isn’t a standard of metrics
> >> for
> >> > > > users and
> >> > > > >> > > developers to tuning their jobs with lookup joins, which is a
> >> > > quite
> >> > > > common
> >> > > > >> > > use case in Flink table / SQL.
> >> > > > >> > > >
> >> > > > >> > > > Therefore we propose some new APIs including cache,
> >> metrics,
> >> > > > wrapper
> >> > > > >> > > classes of TableFunction and new table options. Please take a
> >> > look
> >> > > > at the
> >> > > > >> > > FLIP page [1] to get more details. Any suggestions and
> >> comments
> >> > > > would be
> >> > > > >> > > appreciated!
> >> > > > >> > > >
> >> > > > >> > > > [1]
> >> > > > >> > >
> >> > > >
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> > > > >> > > >
> >> > > > >> > > > Best regards,
> >> > > > >> > > >
> >> > > > >> > > > Qingsheng
> >> > > > >> > > >
> >> > > > >> > > >
> >> > > > >> > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > Best Regards,
> >> > > > >
> >> > > > > Qingsheng Ren
> >> > > > >
> >> > > > > Real-time Computing Team
> >> > > > > Alibaba Cloud
> >> > > > >
> >> > > > > Email: renqs...@gmail.com
> >> > > >
> >> > >
> >> >
> >>
> >
> >
> > --
> > Best regards,
> > Roman Boyko
> > e.: ro.v.bo...@gmail.com
> >

Reply via email to