Hi Martijn!

Got it. Therefore, the realization with InputFormat is not considered.
Thanks for clearing that up!

Best regards,
Smirnov Alexander

чт, 12 мая 2022 г. в 14:23, Martijn Visser <mart...@ververica.com>:
>
> Hi,
>
> With regards to:
>
> > But if there are plans to refactor all connectors to FLIP-27
>
> Yes, FLIP-27 is the target for all connectors. The old interfaces will be
> deprecated and connectors will either be refactored to use the new ones or
> dropped.
>
> The caching should work for connectors that are using FLIP-27 interfaces,
> we should not introduce new features for old interfaces.
>
> Best regards,
>
> Martijn
>
> On Thu, 12 May 2022 at 06:19, Александр Смирнов <smirale...@gmail.com>
> wrote:
>
> > Hi Jark!
> >
> > Sorry for the late response. I would like to make some comments and
> > clarify my points.
> >
> > 1) I agree with your first statement. I think we can achieve both
> > advantages this way: put the Cache interface in flink-table-common,
> > but have implementations of it in flink-table-runtime. Therefore if a
> > connector developer wants to use existing cache strategies and their
> > implementations, he can just pass lookupConfig to the planner, but if
> > he wants to have its own cache implementation in his TableFunction, it
> > will be possible for him to use the existing interface for this
> > purpose (we can explicitly point this out in the documentation). In
> > this way all configs and metrics will be unified. WDYT?
> >
> > > If a filter can prune 90% of data in the cache, we will have 90% of
> > lookup requests that can never be cached
> >
> > 2) Let me clarify the logic filters optimization in case of LRU cache.
> > It looks like Cache<RowData, Collection<RowData>>. Here we always
> > store the response of the dimension table in cache, even after
> > applying calc function. I.e. if there are no rows after applying
> > filters to the result of the 'eval' method of TableFunction, we store
> > the empty list by lookup keys. Therefore the cache line will be
> > filled, but will require much less memory (in bytes). I.e. we don't
> > completely filter keys, by which result was pruned, but significantly
> > reduce required memory to store this result. If the user knows about
> > this behavior, he can increase the 'max-rows' option before the start
> > of the job. But actually I came up with the idea that we can do this
> > automatically by using the 'maximumWeight' and 'weigher' methods of
> > GuavaCache [1]. Weight can be the size of the collection of rows
> > (value of cache). Therefore cache can automatically fit much more
> > records than before.
> >
> > > Flink SQL has provided a standard way to do filters and projects
> > pushdown, i.e., SupportsFilterPushDown and SupportsProjectionPushDown.
> > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean it's hard
> > to implement.
> >
> > It's debatable how difficult it will be to implement filter pushdown.
> > But I think the fact that currently there is no database connector
> > with filter pushdown at least means that this feature won't be
> > supported soon in connectors. Moreover, if we talk about other
> > connectors (not in Flink repo), their databases might not support all
> > Flink filters (or not support filters at all). I think users are
> > interested in supporting cache filters optimization  independently of
> > supporting other features and solving more complex problems (or
> > unsolvable at all).
> >
> > 3) I agree with your third statement. Actually in our internal version
> > I also tried to unify the logic of scanning and reloading data from
> > connectors. But unfortunately, I didn't find a way to unify the logic
> > of all ScanRuntimeProviders (InputFormat, SourceFunction, Source,...)
> > and reuse it in reloading ALL cache. As a result I settled on using
> > InputFormat, because it was used for scanning in all lookup
> > connectors. (I didn't know that there are plans to deprecate
> > InputFormat in favor of FLIP-27 Source). IMO usage of FLIP-27 source
> > in ALL caching is not good idea, because this source was designed to
> > work in distributed environment (SplitEnumerator on JobManager and
> > SourceReaders on TaskManagers), not in one operator (lookup join
> > operator in our case). There is even no direct way to pass splits from
> > SplitEnumerator to SourceReader (this logic works through
> > SplitEnumeratorContext, which requires
> > OperatorCoordinator.SubtaskGateway to send AddSplitEvents). Usage of
> > InputFormat for ALL cache seems much more clearer and easier. But if
> > there are plans to refactor all connectors to FLIP-27, I have the
> > following ideas: maybe we can refuse from lookup join ALL cache in
> > favor of simple join with multiple scanning of batch source? The point
> > is that the only difference between lookup join ALL cache and simple
> > join with batch source is that in the first case scanning is performed
> > multiple times, in between which state (cache) is cleared (correct me
> > if I'm wrong). So what if we extend the functionality of simple join
> > to support state reloading + extend the functionality of scanning
> > batch source multiple times (this one should be easy with new FLIP-27
> > source, that unifies streaming/batch reading - we will need to change
> > only SplitEnumerator, which will pass splits again after some TTL).
> > WDYT? I must say that this looks like a long-term goal and will make
> > the scope of this FLIP even larger than you said. Maybe we can limit
> > ourselves to a simpler solution now (InputFormats).
> >
> > So to sum up, my points is like this:
> > 1) There is a way to make both concise and flexible interfaces for
> > caching in lookup join.
> > 2) Cache filters optimization is important both in LRU and ALL caches.
> > 3) It is unclear when filter pushdown will be supported in Flink
> > connectors, some of the connectors might not have the opportunity to
> > support filter pushdown + as I know, currently filter pushdown works
> > only for scanning (not lookup). So cache filters + projections
> > optimization should be independent from other features.
> > 4) ALL cache realization is a complex topic that involves multiple
> > aspects of how Flink is developing. Refusing from InputFormat in favor
> > of FLIP-27 Source will make ALL cache realization really complex and
> > not clear, so maybe instead of that we can extend the functionality of
> > simple join or not refuse from InputFormat in case of lookup join ALL
> > cache?
> >
> > Best regards,
> > Smirnov Alexander
> >
> >
> >
> >
> >
> >
> >
> > [1]
> > https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
> >
> > чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
> > >
> > > It's great to see the active discussion! I want to share my ideas:
> > >
> > > 1) implement the cache in framework vs. connectors base
> > > I don't have a strong opinion on this. Both ways should work (e.g., cache
> > > pruning, compatibility).
> > > The framework way can provide more concise interfaces.
> > > The connector base way can define more flexible cache
> > > strategies/implementations.
> > > We are still investigating a way to see if we can have both advantages.
> > > We should reach a consensus that the way should be a final state, and we
> > > are on the path to it.
> > >
> > > 2) filters and projections pushdown:
> > > I agree with Alex that the filter pushdown into cache can benefit a lot
> > for
> > > ALL cache.
> > > However, this is not true for LRU cache. Connectors use cache to reduce
> > IO
> > > requests to databases for better throughput.
> > > If a filter can prune 90% of data in the cache, we will have 90% of
> > lookup
> > > requests that can never be cached
> > > and hit directly to the databases. That means the cache is meaningless in
> > > this case.
> > >
> > > IMO, Flink SQL has provided a standard way to do filters and projects
> > > pushdown, i.e., SupportsFilterPushDown and SupportsProjectionPushDown.
> > > Jdbc/hive/HBase haven't implemented the interfaces, don't mean it's hard
> > to
> > > implement.
> > > They should implement the pushdown interfaces to reduce IO and the cache
> > > size.
> > > That should be a final state that the scan source and lookup source share
> > > the exact pushdown implementation.
> > > I don't see why we need to duplicate the pushdown logic in caches, which
> > > will complex the lookup join design.
> > >
> > > 3) ALL cache abstraction
> > > All cache might be the most challenging part of this FLIP. We have never
> > > provided a reload-lookup public interface.
> > > Currently, we put the reload logic in the "eval" method of TableFunction.
> > > That's hard for some sources (e.g., Hive).
> > > Ideally, connector implementation should share the logic of reload and
> > > scan, i.e. ScanTableSource with InputFormat/SourceFunction/FLIP-27
> > Source.
> > > However, InputFormat/SourceFunction are deprecated, and the FLIP-27
> > source
> > > is deeply coupled with SourceOperator.
> > > If we want to invoke the FLIP-27 source in LookupJoin, this may make the
> > > scope of this FLIP much larger.
> > > We are still investigating how to abstract the ALL cache logic and reuse
> > > the existing source interfaces.
> > >
> > >
> > > Best,
> > > Jark
> > >
> > >
> > >
> > > On Thu, 5 May 2022 at 20:22, Roman Boyko <ro.v.bo...@gmail.com> wrote:
> > >
> > > > It's a much more complicated activity and lies out of the scope of this
> > > > improvement. Because such pushdowns should be done for all
> > ScanTableSource
> > > > implementations (not only for Lookup ones).
> > > >
> > > > On Thu, 5 May 2022 at 19:02, Martijn Visser <martijnvis...@apache.org>
> > > > wrote:
> > > >
> > > >> Hi everyone,
> > > >>
> > > >> One question regarding "And Alexander correctly mentioned that filter
> > > >> pushdown still is not implemented for jdbc/hive/hbase." -> Would an
> > > >> alternative solution be to actually implement these filter pushdowns?
> > I
> > > >> can
> > > >> imagine that there are many more benefits to doing that, outside of
> > lookup
> > > >> caching and metrics.
> > > >>
> > > >> Best regards,
> > > >>
> > > >> Martijn Visser
> > > >> https://twitter.com/MartijnVisser82
> > > >> https://github.com/MartijnVisser
> > > >>
> > > >>
> > > >> On Thu, 5 May 2022 at 13:58, Roman Boyko <ro.v.bo...@gmail.com>
> > wrote:
> > > >>
> > > >> > Hi everyone!
> > > >> >
> > > >> > Thanks for driving such a valuable improvement!
> > > >> >
> > > >> > I do think that single cache implementation would be a nice
> > opportunity
> > > >> for
> > > >> > users. And it will break the "FOR SYSTEM_TIME AS OF proc_time"
> > semantics
> > > >> > anyway - doesn't matter how it will be implemented.
> > > >> >
> > > >> > Putting myself in the user's shoes, I can say that:
> > > >> > 1) I would prefer to have the opportunity to cut off the cache size
> > by
> > > >> > simply filtering unnecessary data. And the most handy way to do it
> > is
> > > >> apply
> > > >> > it inside LookupRunners. It would be a bit harder to pass it
> > through the
> > > >> > LookupJoin node to TableFunction. And Alexander correctly mentioned
> > that
> > > >> > filter pushdown still is not implemented for jdbc/hive/hbase.
> > > >> > 2) The ability to set the different caching parameters for different
> > > >> tables
> > > >> > is quite important. So I would prefer to set it through DDL rather
> > than
> > > >> > have similar ttla, strategy and other options for all lookup tables.
> > > >> > 3) Providing the cache into the framework really deprives us of
> > > >> > extensibility (users won't be able to implement their own cache).
> > But
> > > >> most
> > > >> > probably it might be solved by creating more different cache
> > strategies
> > > >> and
> > > >> > a wider set of configurations.
> > > >> >
> > > >> > All these points are much closer to the schema proposed by
> > Alexander.
> > > >> > Qingshen Ren, please correct me if I'm not right and all these
> > > >> facilities
> > > >> > might be simply implemented in your architecture?
> > > >> >
> > > >> > Best regards,
> > > >> > Roman Boyko
> > > >> > e.: ro.v.bo...@gmail.com
> > > >> >
> > > >> > On Wed, 4 May 2022 at 21:01, Martijn Visser <
> > martijnvis...@apache.org>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi everyone,
> > > >> > >
> > > >> > > I don't have much to chip in, but just wanted to express that I
> > really
> > > >> > > appreciate the in-depth discussion on this topic and I hope that
> > > >> others
> > > >> > > will join the conversation.
> > > >> > >
> > > >> > > Best regards,
> > > >> > >
> > > >> > > Martijn
> > > >> > >
> > > >> > > On Tue, 3 May 2022 at 10:15, Александр Смирнов <
> > smirale...@gmail.com>
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi Qingsheng, Leonard and Jark,
> > > >> > > >
> > > >> > > > Thanks for your detailed feedback! However, I have questions
> > about
> > > >> > > > some of your statements (maybe I didn't get something?).
> > > >> > > >
> > > >> > > > > Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
> > > >> > > proc_time”
> > > >> > > >
> > > >> > > > I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time"
> > is
> > > >> not
> > > >> > > > fully implemented with caching, but as you said, users go on it
> > > >> > > > consciously to achieve better performance (no one proposed to
> > enable
> > > >> > > > caching by default, etc.). Or by users do you mean other
> > developers
> > > >> of
> > > >> > > > connectors? In this case developers explicitly specify whether
> > their
> > > >> > > > connector supports caching or not (in the list of supported
> > > >> options),
> > > >> > > > no one makes them do that if they don't want to. So what
> > exactly is
> > > >> > > > the difference between implementing caching in modules
> > > >> > > > flink-table-runtime and in flink-table-common from the
> > considered
> > > >> > > > point of view? How does it affect on breaking/non-breaking the
> > > >> > > > semantics of "FOR SYSTEM_TIME AS OF proc_time"?
> > > >> > > >
> > > >> > > > > confront a situation that allows table options in DDL to
> > control
> > > >> the
> > > >> > > > behavior of the framework, which has never happened previously
> > and
> > > >> > should
> > > >> > > > be cautious
> > > >> > > >
> > > >> > > > If we talk about main differences of semantics of DDL options
> > and
> > > >> > > > config options("table.exec.xxx"), isn't it about limiting the
> > scope
> > > >> of
> > > >> > > > the options + importance for the user business logic rather than
> > > >> > > > specific location of corresponding logic in the framework? I
> > mean
> > > >> that
> > > >> > > > in my design, for example, putting an option with lookup cache
> > > >> > > > strategy in configurations would  be the wrong decision,
> > because it
> > > >> > > > directly affects the user's business logic (not just performance
> > > >> > > > optimization) + touches just several functions of ONE table
> > (there
> > > >> can
> > > >> > > > be multiple tables with different caches). Does it really
> > matter for
> > > >> > > > the user (or someone else) where the logic is located, which is
> > > >> > > > affected by the applied option?
> > > >> > > > Also I can remember DDL option 'sink.parallelism', which in
> > some way
> > > >> > > > "controls the behavior of the framework" and I don't see any
> > problem
> > > >> > > > here.
> > > >> > > >
> > > >> > > > > introduce a new interface for this all-caching scenario and
> > the
> > > >> > design
> > > >> > > > would become more complex
> > > >> > > >
> > > >> > > > This is a subject for a separate discussion, but actually in our
> > > >> > > > internal version we solved this problem quite easily - we reused
> > > >> > > > InputFormat class (so there is no need for a new API). The
> > point is
> > > >> > > > that currently all lookup connectors use InputFormat for
> > scanning
> > > >> the
> > > >> > > > data in batch mode: HBase, JDBC and even Hive - it uses class
> > > >> > > > PartitionReader, that is actually just a wrapper around
> > InputFormat.
> > > >> > > > The advantage of this solution is the ability to reload cache
> > data
> > > >> in
> > > >> > > > parallel (number of threads depends on number of InputSplits,
> > but
> > > >> has
> > > >> > > > an upper limit). As a result cache reload time significantly
> > reduces
> > > >> > > > (as well as time of input stream blocking). I know that usually
> > we
> > > >> try
> > > >> > > > to avoid usage of concurrency in Flink code, but maybe this one
> > can
> > > >> be
> > > >> > > > an exception. BTW I don't say that it's an ideal solution, maybe
> > > >> there
> > > >> > > > are better ones.
> > > >> > > >
> > > >> > > > > Providing the cache in the framework might introduce
> > compatibility
> > > >> > > issues
> > > >> > > >
> > > >> > > > It's possible only in cases when the developer of the connector
> > > >> won't
> > > >> > > > properly refactor his code and will use new cache options
> > > >> incorrectly
> > > >> > > > (i.e. explicitly provide the same options into 2 different code
> > > >> > > > places). For correct behavior all he will need to do is to
> > redirect
> > > >> > > > existing options to the framework's LookupConfig (+ maybe add an
> > > >> alias
> > > >> > > > for options, if there was different naming), everything will be
> > > >> > > > transparent for users. If the developer won't do refactoring at
> > all,
> > > >> > > > nothing will be changed for the connector because of backward
> > > >> > > > compatibility. Also if a developer wants to use his own cache
> > logic,
> > > >> > > > he just can refuse to pass some of the configs into the
> > framework,
> > > >> and
> > > >> > > > instead make his own implementation with already existing
> > configs
> > > >> and
> > > >> > > > metrics (but actually I think that it's a rare case).
> > > >> > > >
> > > >> > > > > filters and projections should be pushed all the way down to
> > the
> > > >> > table
> > > >> > > > function, like what we do in the scan source
> > > >> > > >
> > > >> > > > It's the great purpose. But the truth is that the ONLY connector
> > > >> that
> > > >> > > > supports filter pushdown is FileSystemTableSource
> > > >> > > > (no database connector supports it currently). Also for some
> > > >> databases
> > > >> > > > it's simply impossible to pushdown such complex filters that we
> > have
> > > >> > > > in Flink.
> > > >> > > >
> > > >> > > > >  only applying these optimizations to the cache seems not
> > quite
> > > >> > useful
> > > >> > > >
> > > >> > > > Filters can cut off an arbitrarily large amount of data from the
> > > >> > > > dimension table. For a simple example, suppose in dimension
> > table
> > > >> > > > 'users'
> > > >> > > > we have column 'age' with values from 20 to 40, and input stream
> > > >> > > > 'clicks' that is ~uniformly distributed by age of users. If we
> > have
> > > >> > > > filter 'age > 30',
> > > >> > > > there will be twice less data in cache. This means the user can
> > > >> > > > increase 'lookup.cache.max-rows' by almost 2 times. It will
> > gain a
> > > >> > > > huge
> > > >> > > > performance boost. Moreover, this optimization starts to really
> > > >> shine
> > > >> > > > in 'ALL' cache, where tables without filters and projections
> > can't
> > > >> fit
> > > >> > > > in memory, but with them - can. This opens up additional
> > > >> possibilities
> > > >> > > > for users. And this doesn't sound as 'not quite useful'.
> > > >> > > >
> > > >> > > > It would be great to hear other voices regarding this topic!
> > Because
> > > >> > > > we have quite a lot of controversial points, and I think with
> > the
> > > >> help
> > > >> > > > of others it will be easier for us to come to a consensus.
> > > >> > > >
> > > >> > > > Best regards,
> > > >> > > > Smirnov Alexander
> > > >> > > >
> > > >> > > >
> > > >> > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <renqs...@gmail.com
> > >:
> > > >> > > > >
> > > >> > > > > Hi Alexander and Arvid,
> > > >> > > > >
> > > >> > > > > Thanks for the discussion and sorry for my late response! We
> > had
> > > >> an
> > > >> > > > internal discussion together with Jark and Leonard and I’d like
> > to
> > > >> > > > summarize our ideas. Instead of implementing the cache logic in
> > the
> > > >> > table
> > > >> > > > runtime layer or wrapping around the user-provided table
> > function,
> > > >> we
> > > >> > > > prefer to introduce some new APIs extending TableFunction with
> > these
> > > >> > > > concerns:
> > > >> > > > >
> > > >> > > > > 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME
> > AS OF
> > > >> > > > proc_time”, because it couldn’t truly reflect the content of the
> > > >> lookup
> > > >> > > > table at the moment of querying. If users choose to enable
> > caching
> > > >> on
> > > >> > the
> > > >> > > > lookup table, they implicitly indicate that this breakage is
> > > >> acceptable
> > > >> > > in
> > > >> > > > exchange for the performance. So we prefer not to provide
> > caching on
> > > >> > the
> > > >> > > > table runtime level.
> > > >> > > > >
> > > >> > > > > 2. If we make the cache implementation in the framework
> > (whether
> > > >> in a
> > > >> > > > runner or a wrapper around TableFunction), we have to confront a
> > > >> > > situation
> > > >> > > > that allows table options in DDL to control the behavior of the
> > > >> > > framework,
> > > >> > > > which has never happened previously and should be cautious.
> > Under
> > > >> the
> > > >> > > > current design the behavior of the framework should only be
> > > >> specified
> > > >> > by
> > > >> > > > configurations (“table.exec.xxx”), and it’s hard to apply these
> > > >> general
> > > >> > > > configs to a specific table.
> > > >> > > > >
> > > >> > > > > 3. We have use cases that lookup source loads and refresh all
> > > >> records
> > > >> > > > periodically into the memory to achieve high lookup performance
> > > >> (like
> > > >> > > Hive
> > > >> > > > connector in the community, and also widely used by our internal
> > > >> > > > connectors). Wrapping the cache around the user’s TableFunction
> > > >> works
> > > >> > > fine
> > > >> > > > for LRU caches, but I think we have to introduce a new
> > interface for
> > > >> > this
> > > >> > > > all-caching scenario and the design would become more complex.
> > > >> > > > >
> > > >> > > > > 4. Providing the cache in the framework might introduce
> > > >> compatibility
> > > >> > > > issues to existing lookup sources like there might exist two
> > caches
> > > >> > with
> > > >> > > > totally different strategies if the user incorrectly configures
> > the
> > > >> > table
> > > >> > > > (one in the framework and another implemented by the lookup
> > source).
> > > >> > > > >
> > > >> > > > > As for the optimization mentioned by Alexander, I think
> > filters
> > > >> and
> > > >> > > > projections should be pushed all the way down to the table
> > function,
> > > >> > like
> > > >> > > > what we do in the scan source, instead of the runner with the
> > cache.
> > > >> > The
> > > >> > > > goal of using cache is to reduce the network I/O and pressure
> > on the
> > > >> > > > external system, and only applying these optimizations to the
> > cache
> > > >> > seems
> > > >> > > > not quite useful.
> > > >> > > > >
> > > >> > > > > I made some updates to the FLIP[1] to reflect our ideas. We
> > > >> prefer to
> > > >> > > > keep the cache implementation as a part of TableFunction, and we
> > > >> could
> > > >> > > > provide some helper classes (CachingTableFunction,
> > > >> > > AllCachingTableFunction,
> > > >> > > > CachingAsyncTableFunction) to developers and regulate metrics
> > of the
> > > >> > > cache.
> > > >> > > > Also, I made a POC[2] for your reference.
> > > >> > > > >
> > > >> > > > > Looking forward to your ideas!
> > > >> > > > >
> > > >> > > > > [1]
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > >> > > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221
> > > >> > > > >
> > > >> > > > > Best regards,
> > > >> > > > >
> > > >> > > > > Qingsheng
> > > >> > > > >
> > > >> > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <
> > > >> > > smirale...@gmail.com>
> > > >> > > > wrote:
> > > >> > > > >>
> > > >> > > > >> Thanks for the response, Arvid!
> > > >> > > > >>
> > > >> > > > >> I have few comments on your message.
> > > >> > > > >>
> > > >> > > > >> > but could also live with an easier solution as the first
> > step:
> > > >> > > > >>
> > > >> > > > >> I think that these 2 ways are mutually exclusive (originally
> > > >> > proposed
> > > >> > > > >> by Qingsheng and mine), because conceptually they follow the
> > same
> > > >> > > > >> goal, but implementation details are different. If we will
> > go one
> > > >> > way,
> > > >> > > > >> moving to another way in the future will mean deleting
> > existing
> > > >> code
> > > >> > > > >> and once again changing the API for connectors. So I think we
> > > >> should
> > > >> > > > >> reach a consensus with the community about that and then work
> > > >> > together
> > > >> > > > >> on this FLIP, i.e. divide the work on tasks for different
> > parts
> > > >> of
> > > >> > the
> > > >> > > > >> flip (for example, LRU cache unification / introducing
> > proposed
> > > >> set
> > > >> > of
> > > >> > > > >> metrics / further work…). WDYT, Qingsheng?
> > > >> > > > >>
> > > >> > > > >> > as the source will only receive the requests after filter
> > > >> > > > >>
> > > >> > > > >> Actually if filters are applied to fields of the lookup
> > table, we
> > > >> > > > >> firstly must do requests, and only after that we can filter
> > > >> > responses,
> > > >> > > > >> because lookup connectors don't have filter pushdown. So if
> > > >> > filtering
> > > >> > > > >> is done before caching, there will be much less rows in
> > cache.
> > > >> > > > >>
> > > >> > > > >> > @Alexander unfortunately, your architecture is not shared.
> > I
> > > >> don't
> > > >> > > > know the
> > > >> > > > >>
> > > >> > > > >> > solution to share images to be honest.
> > > >> > > > >>
> > > >> > > > >> Sorry for that, I’m a bit new to such kinds of conversations
> > :)
> > > >> > > > >> I have no write access to the confluence, so I made a Jira
> > issue,
> > > >> > > > >> where described the proposed changes in more details -
> > > >> > > > >> https://issues.apache.org/jira/browse/FLINK-27411.
> > > >> > > > >>
> > > >> > > > >> Will happy to get more feedback!
> > > >> > > > >>
> > > >> > > > >> Best,
> > > >> > > > >> Smirnov Alexander
> > > >> > > > >>
> > > >> > > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>:
> > > >> > > > >> >
> > > >> > > > >> > Hi Qingsheng,
> > > >> > > > >> >
> > > >> > > > >> > Thanks for driving this; the inconsistency was not
> > satisfying
> > > >> for
> > > >> > > me.
> > > >> > > > >> >
> > > >> > > > >> > I second Alexander's idea though but could also live with
> > an
> > > >> > easier
> > > >> > > > >> > solution as the first step: Instead of making caching an
> > > >> > > > implementation
> > > >> > > > >> > detail of TableFunction X, rather devise a caching layer
> > > >> around X.
> > > >> > > So
> > > >> > > > the
> > > >> > > > >> > proposal would be a CachingTableFunction that delegates to
> > X in
> > > >> > case
> > > >> > > > of
> > > >> > > > >> > misses and else manages the cache. Lifting it into the
> > operator
> > > >> > > model
> > > >> > > > as
> > > >> > > > >> > proposed would be even better but is probably unnecessary
> > in
> > > >> the
> > > >> > > > first step
> > > >> > > > >> > for a lookup source (as the source will only receive the
> > > >> requests
> > > >> > > > after
> > > >> > > > >> > filter; applying projection may be more interesting to save
> > > >> > memory).
> > > >> > > > >> >
> > > >> > > > >> > Another advantage is that all the changes of this FLIP
> > would be
> > > >> > > > limited to
> > > >> > > > >> > options, no need for new public interfaces. Everything else
> > > >> > remains
> > > >> > > an
> > > >> > > > >> > implementation of Table runtime. That means we can easily
> > > >> > > incorporate
> > > >> > > > the
> > > >> > > > >> > optimization potential that Alexander pointed out later.
> > > >> > > > >> >
> > > >> > > > >> > @Alexander unfortunately, your architecture is not shared.
> > I
> > > >> don't
> > > >> > > > know the
> > > >> > > > >> > solution to share images to be honest.
> > > >> > > > >> >
> > > >> > > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <
> > > >> > > > smirale...@gmail.com>
> > > >> > > > >> > wrote:
> > > >> > > > >> >
> > > >> > > > >> > > Hi Qingsheng! My name is Alexander, I'm not a committer
> > yet,
> > > >> but
> > > >> > > I'd
> > > >> > > > >> > > really like to become one. And this FLIP really
> > interested
> > > >> me.
> > > >> > > > >> > > Actually I have worked on a similar feature in my
> > company’s
> > > >> > Flink
> > > >> > > > >> > > fork, and we would like to share our thoughts on this and
> > > >> make
> > > >> > > code
> > > >> > > > >> > > open source.
> > > >> > > > >> > >
> > > >> > > > >> > > I think there is a better alternative than introducing an
> > > >> > abstract
> > > >> > > > >> > > class for TableFunction (CachingTableFunction). As you
> > know,
> > > >> > > > >> > > TableFunction exists in the flink-table-common module,
> > which
> > > >> > > > provides
> > > >> > > > >> > > only an API for working with tables – it’s very
> > convenient
> > > >> for
> > > >> > > > importing
> > > >> > > > >> > > in connectors. In turn, CachingTableFunction contains
> > logic
> > > >> for
> > > >> > > > >> > > runtime execution,  so this class and everything
> > connected
> > > >> with
> > > >> > it
> > > >> > > > >> > > should be located in another module, probably in
> > > >> > > > flink-table-runtime.
> > > >> > > > >> > > But this will require connectors to depend on another
> > module,
> > > >> > > which
> > > >> > > > >> > > contains a lot of runtime logic, which doesn’t sound
> > good.
> > > >> > > > >> > >
> > > >> > > > >> > > I suggest adding a new method ‘getLookupConfig’ to
> > > >> > > LookupTableSource
> > > >> > > > >> > > or LookupRuntimeProvider to allow connectors to only pass
> > > >> > > > >> > > configurations to the planner, therefore they won’t
> > depend on
> > > >> > > > runtime
> > > >> > > > >> > > realization. Based on these configs planner will
> > construct a
> > > >> > > lookup
> > > >> > > > >> > > join operator with corresponding runtime logic
> > > >> (ProcessFunctions
> > > >> > > in
> > > >> > > > >> > > module flink-table-runtime). Architecture looks like in
> > the
> > > >> > pinned
> > > >> > > > >> > > image (LookupConfig class there is actually yours
> > > >> CacheConfig).
> > > >> > > > >> > >
> > > >> > > > >> > > Classes in flink-table-planner, that will be responsible
> > for
> > > >> > this
> > > >> > > –
> > > >> > > > >> > > CommonPhysicalLookupJoin and his inheritors.
> > > >> > > > >> > > Current classes for lookup join in  flink-table-runtime
> > -
> > > >> > > > >> > > LookupJoinRunner, AsyncLookupJoinRunner,
> > > >> > LookupJoinRunnerWithCalc,
> > > >> > > > >> > > AsyncLookupJoinRunnerWithCalc.
> > > >> > > > >> > >
> > > >> > > > >> > > I suggest adding classes LookupJoinCachingRunner,
> > > >> > > > >> > > LookupJoinCachingRunnerWithCalc, etc.
> > > >> > > > >> > >
> > > >> > > > >> > > And here comes another more powerful advantage of such a
> > > >> > solution.
> > > >> > > > If
> > > >> > > > >> > > we have caching logic on a lower level, we can apply some
> > > >> > > > >> > > optimizations to it. LookupJoinRunnerWithCalc was named
> > like
> > > >> > this
> > > >> > > > >> > > because it uses the ‘calc’ function, which actually
> > mostly
> > > >> > > consists
> > > >> > > > of
> > > >> > > > >> > > filters and projections.
> > > >> > > > >> > >
> > > >> > > > >> > > For example, in join table A with lookup table B
> > condition
> > > >> > ‘JOIN …
> > > >> > > > ON
> > > >> > > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’
> > > >> > ‘calc’
> > > >> > > > >> > > function will contain filters A.age = B.age + 10 and
> > > >> B.salary >
> > > >> > > > 1000.
> > > >> > > > >> > >
> > > >> > > > >> > > If we apply this function before storing records in
> > cache,
> > > >> size
> > > >> > of
> > > >> > > > >> > > cache will be significantly reduced: filters = avoid
> > storing
> > > >> > > useless
> > > >> > > > >> > > records in cache, projections = reduce records’ size. So
> > the
> > > >> > > initial
> > > >> > > > >> > > max number of records in cache can be increased by the
> > user.
> > > >> > > > >> > >
> > > >> > > > >> > > What do you think about it?
> > > >> > > > >> > >
> > > >> > > > >> > >
> > > >> > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> > > >> > > > >> > > > Hi devs,
> > > >> > > > >> > > >
> > > >> > > > >> > > > Yuan and I would like to start a discussion about
> > > >> FLIP-221[1],
> > > >> > > > which
> > > >> > > > >> > > introduces an abstraction of lookup table cache and its
> > > >> standard
> > > >> > > > metrics.
> > > >> > > > >> > > >
> > > >> > > > >> > > > Currently each lookup table source should implement
> > their
> > > >> own
> > > >> > > > cache to
> > > >> > > > >> > > store lookup results, and there isn’t a standard of
> > metrics
> > > >> for
> > > >> > > > users and
> > > >> > > > >> > > developers to tuning their jobs with lookup joins, which
> > is a
> > > >> > > quite
> > > >> > > > common
> > > >> > > > >> > > use case in Flink table / SQL.
> > > >> > > > >> > > >
> > > >> > > > >> > > > Therefore we propose some new APIs including cache,
> > > >> metrics,
> > > >> > > > wrapper
> > > >> > > > >> > > classes of TableFunction and new table options. Please
> > take a
> > > >> > look
> > > >> > > > at the
> > > >> > > > >> > > FLIP page [1] to get more details. Any suggestions and
> > > >> comments
> > > >> > > > would be
> > > >> > > > >> > > appreciated!
> > > >> > > > >> > > >
> > > >> > > > >> > > > [1]
> > > >> > > > >> > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > >> > > > >> > > >
> > > >> > > > >> > > > Best regards,
> > > >> > > > >> > > >
> > > >> > > > >> > > > Qingsheng
> > > >> > > > >> > > >
> > > >> > > > >> > > >
> > > >> > > > >> > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > --
> > > >> > > > > Best Regards,
> > > >> > > > >
> > > >> > > > > Qingsheng Ren
> > > >> > > > >
> > > >> > > > > Real-time Computing Team
> > > >> > > > > Alibaba Cloud
> > > >> > > > >
> > > >> > > > > Email: renqs...@gmail.com
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > Best regards,
> > > > Roman Boyko
> > > > e.: ro.v.bo...@gmail.com
> > > >
> >

Reply via email to