It's a much more complicated activity and lies out of the scope of this
improvement. Because such pushdowns should be done for all ScanTableSource
implementations (not only for Lookup ones).

On Thu, 5 May 2022 at 19:02, Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi everyone,
>
> One question regarding "And Alexander correctly mentioned that filter
> pushdown still is not implemented for jdbc/hive/hbase." -> Would an
> alternative solution be to actually implement these filter pushdowns? I can
> imagine that there are many more benefits to doing that, outside of lookup
> caching and metrics.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
> https://github.com/MartijnVisser
>
>
> On Thu, 5 May 2022 at 13:58, Roman Boyko <ro.v.bo...@gmail.com> wrote:
>
> > Hi everyone!
> >
> > Thanks for driving such a valuable improvement!
> >
> > I do think that single cache implementation would be a nice opportunity
> for
> > users. And it will break the "FOR SYSTEM_TIME AS OF proc_time" semantics
> > anyway - doesn't matter how it will be implemented.
> >
> > Putting myself in the user's shoes, I can say that:
> > 1) I would prefer to have the opportunity to cut off the cache size by
> > simply filtering unnecessary data. And the most handy way to do it is
> apply
> > it inside LookupRunners. It would be a bit harder to pass it through the
> > LookupJoin node to TableFunction. And Alexander correctly mentioned that
> > filter pushdown still is not implemented for jdbc/hive/hbase.
> > 2) The ability to set the different caching parameters for different
> tables
> > is quite important. So I would prefer to set it through DDL rather than
> > have similar ttla, strategy and other options for all lookup tables.
> > 3) Providing the cache into the framework really deprives us of
> > extensibility (users won't be able to implement their own cache). But
> most
> > probably it might be solved by creating more different cache strategies
> and
> > a wider set of configurations.
> >
> > All these points are much closer to the schema proposed by Alexander.
> > Qingshen Ren, please correct me if I'm not right and all these facilities
> > might be simply implemented in your architecture?
> >
> > Best regards,
> > Roman Boyko
> > e.: ro.v.bo...@gmail.com
> >
> > On Wed, 4 May 2022 at 21:01, Martijn Visser <martijnvis...@apache.org>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I don't have much to chip in, but just wanted to express that I really
> > > appreciate the in-depth discussion on this topic and I hope that others
> > > will join the conversation.
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Tue, 3 May 2022 at 10:15, Александр Смирнов <smirale...@gmail.com>
> > > wrote:
> > >
> > > > Hi Qingsheng, Leonard and Jark,
> > > >
> > > > Thanks for your detailed feedback! However, I have questions about
> > > > some of your statements (maybe I didn't get something?).
> > > >
> > > > > Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
> > > proc_time”
> > > >
> > > > I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time" is
> not
> > > > fully implemented with caching, but as you said, users go on it
> > > > consciously to achieve better performance (no one proposed to enable
> > > > caching by default, etc.). Or by users do you mean other developers
> of
> > > > connectors? In this case developers explicitly specify whether their
> > > > connector supports caching or not (in the list of supported options),
> > > > no one makes them do that if they don't want to. So what exactly is
> > > > the difference between implementing caching in modules
> > > > flink-table-runtime and in flink-table-common from the considered
> > > > point of view? How does it affect on breaking/non-breaking the
> > > > semantics of "FOR SYSTEM_TIME AS OF proc_time"?
> > > >
> > > > > confront a situation that allows table options in DDL to control
> the
> > > > behavior of the framework, which has never happened previously and
> > should
> > > > be cautious
> > > >
> > > > If we talk about main differences of semantics of DDL options and
> > > > config options("table.exec.xxx"), isn't it about limiting the scope
> of
> > > > the options + importance for the user business logic rather than
> > > > specific location of corresponding logic in the framework? I mean
> that
> > > > in my design, for example, putting an option with lookup cache
> > > > strategy in configurations would  be the wrong decision, because it
> > > > directly affects the user's business logic (not just performance
> > > > optimization) + touches just several functions of ONE table (there
> can
> > > > be multiple tables with different caches). Does it really matter for
> > > > the user (or someone else) where the logic is located, which is
> > > > affected by the applied option?
> > > > Also I can remember DDL option 'sink.parallelism', which in some way
> > > > "controls the behavior of the framework" and I don't see any problem
> > > > here.
> > > >
> > > > > introduce a new interface for this all-caching scenario and the
> > design
> > > > would become more complex
> > > >
> > > > This is a subject for a separate discussion, but actually in our
> > > > internal version we solved this problem quite easily - we reused
> > > > InputFormat class (so there is no need for a new API). The point is
> > > > that currently all lookup connectors use InputFormat for scanning the
> > > > data in batch mode: HBase, JDBC and even Hive - it uses class
> > > > PartitionReader, that is actually just a wrapper around InputFormat.
> > > > The advantage of this solution is the ability to reload cache data in
> > > > parallel (number of threads depends on number of InputSplits, but has
> > > > an upper limit). As a result cache reload time significantly reduces
> > > > (as well as time of input stream blocking). I know that usually we
> try
> > > > to avoid usage of concurrency in Flink code, but maybe this one can
> be
> > > > an exception. BTW I don't say that it's an ideal solution, maybe
> there
> > > > are better ones.
> > > >
> > > > > Providing the cache in the framework might introduce compatibility
> > > issues
> > > >
> > > > It's possible only in cases when the developer of the connector won't
> > > > properly refactor his code and will use new cache options incorrectly
> > > > (i.e. explicitly provide the same options into 2 different code
> > > > places). For correct behavior all he will need to do is to redirect
> > > > existing options to the framework's LookupConfig (+ maybe add an
> alias
> > > > for options, if there was different naming), everything will be
> > > > transparent for users. If the developer won't do refactoring at all,
> > > > nothing will be changed for the connector because of backward
> > > > compatibility. Also if a developer wants to use his own cache logic,
> > > > he just can refuse to pass some of the configs into the framework,
> and
> > > > instead make his own implementation with already existing configs and
> > > > metrics (but actually I think that it's a rare case).
> > > >
> > > > > filters and projections should be pushed all the way down to the
> > table
> > > > function, like what we do in the scan source
> > > >
> > > > It's the great purpose. But the truth is that the ONLY connector that
> > > > supports filter pushdown is FileSystemTableSource
> > > > (no database connector supports it currently). Also for some
> databases
> > > > it's simply impossible to pushdown such complex filters that we have
> > > > in Flink.
> > > >
> > > > >  only applying these optimizations to the cache seems not quite
> > useful
> > > >
> > > > Filters can cut off an arbitrarily large amount of data from the
> > > > dimension table. For a simple example, suppose in dimension table
> > > > 'users'
> > > > we have column 'age' with values from 20 to 40, and input stream
> > > > 'clicks' that is ~uniformly distributed by age of users. If we have
> > > > filter 'age > 30',
> > > > there will be twice less data in cache. This means the user can
> > > > increase 'lookup.cache.max-rows' by almost 2 times. It will gain a
> > > > huge
> > > > performance boost. Moreover, this optimization starts to really shine
> > > > in 'ALL' cache, where tables without filters and projections can't
> fit
> > > > in memory, but with them - can. This opens up additional
> possibilities
> > > > for users. And this doesn't sound as 'not quite useful'.
> > > >
> > > > It would be great to hear other voices regarding this topic! Because
> > > > we have quite a lot of controversial points, and I think with the
> help
> > > > of others it will be easier for us to come to a consensus.
> > > >
> > > > Best regards,
> > > > Smirnov Alexander
> > > >
> > > >
> > > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <renqs...@gmail.com>:
> > > > >
> > > > > Hi Alexander and Arvid,
> > > > >
> > > > > Thanks for the discussion and sorry for my late response! We had an
> > > > internal discussion together with Jark and Leonard and I’d like to
> > > > summarize our ideas. Instead of implementing the cache logic in the
> > table
> > > > runtime layer or wrapping around the user-provided table function, we
> > > > prefer to introduce some new APIs extending TableFunction with these
> > > > concerns:
> > > > >
> > > > > 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
> > > > proc_time”, because it couldn’t truly reflect the content of the
> lookup
> > > > table at the moment of querying. If users choose to enable caching on
> > the
> > > > lookup table, they implicitly indicate that this breakage is
> acceptable
> > > in
> > > > exchange for the performance. So we prefer not to provide caching on
> > the
> > > > table runtime level.
> > > > >
> > > > > 2. If we make the cache implementation in the framework (whether
> in a
> > > > runner or a wrapper around TableFunction), we have to confront a
> > > situation
> > > > that allows table options in DDL to control the behavior of the
> > > framework,
> > > > which has never happened previously and should be cautious. Under the
> > > > current design the behavior of the framework should only be specified
> > by
> > > > configurations (“table.exec.xxx”), and it’s hard to apply these
> general
> > > > configs to a specific table.
> > > > >
> > > > > 3. We have use cases that lookup source loads and refresh all
> records
> > > > periodically into the memory to achieve high lookup performance (like
> > > Hive
> > > > connector in the community, and also widely used by our internal
> > > > connectors). Wrapping the cache around the user’s TableFunction works
> > > fine
> > > > for LRU caches, but I think we have to introduce a new interface for
> > this
> > > > all-caching scenario and the design would become more complex.
> > > > >
> > > > > 4. Providing the cache in the framework might introduce
> compatibility
> > > > issues to existing lookup sources like there might exist two caches
> > with
> > > > totally different strategies if the user incorrectly configures the
> > table
> > > > (one in the framework and another implemented by the lookup source).
> > > > >
> > > > > As for the optimization mentioned by Alexander, I think filters and
> > > > projections should be pushed all the way down to the table function,
> > like
> > > > what we do in the scan source, instead of the runner with the cache.
> > The
> > > > goal of using cache is to reduce the network I/O and pressure on the
> > > > external system, and only applying these optimizations to the cache
> > seems
> > > > not quite useful.
> > > > >
> > > > > I made some updates to the FLIP[1] to reflect our ideas. We prefer
> to
> > > > keep the cache implementation as a part of TableFunction, and we
> could
> > > > provide some helper classes (CachingTableFunction,
> > > AllCachingTableFunction,
> > > > CachingAsyncTableFunction) to developers and regulate metrics of the
> > > cache.
> > > > Also, I made a POC[2] for your reference.
> > > > >
> > > > > Looking forward to your ideas!
> > > > >
> > > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221
> > > > >
> > > > > Best regards,
> > > > >
> > > > > Qingsheng
> > > > >
> > > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <
> > > smirale...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> Thanks for the response, Arvid!
> > > > >>
> > > > >> I have few comments on your message.
> > > > >>
> > > > >> > but could also live with an easier solution as the first step:
> > > > >>
> > > > >> I think that these 2 ways are mutually exclusive (originally
> > proposed
> > > > >> by Qingsheng and mine), because conceptually they follow the same
> > > > >> goal, but implementation details are different. If we will go one
> > way,
> > > > >> moving to another way in the future will mean deleting existing
> code
> > > > >> and once again changing the API for connectors. So I think we
> should
> > > > >> reach a consensus with the community about that and then work
> > together
> > > > >> on this FLIP, i.e. divide the work on tasks for different parts of
> > the
> > > > >> flip (for example, LRU cache unification / introducing proposed
> set
> > of
> > > > >> metrics / further work…). WDYT, Qingsheng?
> > > > >>
> > > > >> > as the source will only receive the requests after filter
> > > > >>
> > > > >> Actually if filters are applied to fields of the lookup table, we
> > > > >> firstly must do requests, and only after that we can filter
> > responses,
> > > > >> because lookup connectors don't have filter pushdown. So if
> > filtering
> > > > >> is done before caching, there will be much less rows in cache.
> > > > >>
> > > > >> > @Alexander unfortunately, your architecture is not shared. I
> don't
> > > > know the
> > > > >>
> > > > >> > solution to share images to be honest.
> > > > >>
> > > > >> Sorry for that, I’m a bit new to such kinds of conversations :)
> > > > >> I have no write access to the confluence, so I made a Jira issue,
> > > > >> where described the proposed changes in more details -
> > > > >> https://issues.apache.org/jira/browse/FLINK-27411.
> > > > >>
> > > > >> Will happy to get more feedback!
> > > > >>
> > > > >> Best,
> > > > >> Smirnov Alexander
> > > > >>
> > > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>:
> > > > >> >
> > > > >> > Hi Qingsheng,
> > > > >> >
> > > > >> > Thanks for driving this; the inconsistency was not satisfying
> for
> > > me.
> > > > >> >
> > > > >> > I second Alexander's idea though but could also live with an
> > easier
> > > > >> > solution as the first step: Instead of making caching an
> > > > implementation
> > > > >> > detail of TableFunction X, rather devise a caching layer around
> X.
> > > So
> > > > the
> > > > >> > proposal would be a CachingTableFunction that delegates to X in
> > case
> > > > of
> > > > >> > misses and else manages the cache. Lifting it into the operator
> > > model
> > > > as
> > > > >> > proposed would be even better but is probably unnecessary in the
> > > > first step
> > > > >> > for a lookup source (as the source will only receive the
> requests
> > > > after
> > > > >> > filter; applying projection may be more interesting to save
> > memory).
> > > > >> >
> > > > >> > Another advantage is that all the changes of this FLIP would be
> > > > limited to
> > > > >> > options, no need for new public interfaces. Everything else
> > remains
> > > an
> > > > >> > implementation of Table runtime. That means we can easily
> > > incorporate
> > > > the
> > > > >> > optimization potential that Alexander pointed out later.
> > > > >> >
> > > > >> > @Alexander unfortunately, your architecture is not shared. I
> don't
> > > > know the
> > > > >> > solution to share images to be honest.
> > > > >> >
> > > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <
> > > > smirale...@gmail.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hi Qingsheng! My name is Alexander, I'm not a committer yet,
> but
> > > I'd
> > > > >> > > really like to become one. And this FLIP really interested me.
> > > > >> > > Actually I have worked on a similar feature in my company’s
> > Flink
> > > > >> > > fork, and we would like to share our thoughts on this and make
> > > code
> > > > >> > > open source.
> > > > >> > >
> > > > >> > > I think there is a better alternative than introducing an
> > abstract
> > > > >> > > class for TableFunction (CachingTableFunction). As you know,
> > > > >> > > TableFunction exists in the flink-table-common module, which
> > > > provides
> > > > >> > > only an API for working with tables – it’s very convenient for
> > > > importing
> > > > >> > > in connectors. In turn, CachingTableFunction contains logic
> for
> > > > >> > > runtime execution,  so this class and everything connected
> with
> > it
> > > > >> > > should be located in another module, probably in
> > > > flink-table-runtime.
> > > > >> > > But this will require connectors to depend on another module,
> > > which
> > > > >> > > contains a lot of runtime logic, which doesn’t sound good.
> > > > >> > >
> > > > >> > > I suggest adding a new method ‘getLookupConfig’ to
> > > LookupTableSource
> > > > >> > > or LookupRuntimeProvider to allow connectors to only pass
> > > > >> > > configurations to the planner, therefore they won’t depend on
> > > > runtime
> > > > >> > > realization. Based on these configs planner will construct a
> > > lookup
> > > > >> > > join operator with corresponding runtime logic
> (ProcessFunctions
> > > in
> > > > >> > > module flink-table-runtime). Architecture looks like in the
> > pinned
> > > > >> > > image (LookupConfig class there is actually yours
> CacheConfig).
> > > > >> > >
> > > > >> > > Classes in flink-table-planner, that will be responsible for
> > this
> > > –
> > > > >> > > CommonPhysicalLookupJoin and his inheritors.
> > > > >> > > Current classes for lookup join in  flink-table-runtime  -
> > > > >> > > LookupJoinRunner, AsyncLookupJoinRunner,
> > LookupJoinRunnerWithCalc,
> > > > >> > > AsyncLookupJoinRunnerWithCalc.
> > > > >> > >
> > > > >> > > I suggest adding classes LookupJoinCachingRunner,
> > > > >> > > LookupJoinCachingRunnerWithCalc, etc.
> > > > >> > >
> > > > >> > > And here comes another more powerful advantage of such a
> > solution.
> > > > If
> > > > >> > > we have caching logic on a lower level, we can apply some
> > > > >> > > optimizations to it. LookupJoinRunnerWithCalc was named like
> > this
> > > > >> > > because it uses the ‘calc’ function, which actually mostly
> > > consists
> > > > of
> > > > >> > > filters and projections.
> > > > >> > >
> > > > >> > > For example, in join table A with lookup table B condition
> > ‘JOIN …
> > > > ON
> > > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’
> > ‘calc’
> > > > >> > > function will contain filters A.age = B.age + 10 and B.salary
> >
> > > > 1000.
> > > > >> > >
> > > > >> > > If we apply this function before storing records in cache,
> size
> > of
> > > > >> > > cache will be significantly reduced: filters = avoid storing
> > > useless
> > > > >> > > records in cache, projections = reduce records’ size. So the
> > > initial
> > > > >> > > max number of records in cache can be increased by the user.
> > > > >> > >
> > > > >> > > What do you think about it?
> > > > >> > >
> > > > >> > >
> > > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> > > > >> > > > Hi devs,
> > > > >> > > >
> > > > >> > > > Yuan and I would like to start a discussion about
> FLIP-221[1],
> > > > which
> > > > >> > > introduces an abstraction of lookup table cache and its
> standard
> > > > metrics.
> > > > >> > > >
> > > > >> > > > Currently each lookup table source should implement their
> own
> > > > cache to
> > > > >> > > store lookup results, and there isn’t a standard of metrics
> for
> > > > users and
> > > > >> > > developers to tuning their jobs with lookup joins, which is a
> > > quite
> > > > common
> > > > >> > > use case in Flink table / SQL.
> > > > >> > > >
> > > > >> > > > Therefore we propose some new APIs including cache, metrics,
> > > > wrapper
> > > > >> > > classes of TableFunction and new table options. Please take a
> > look
> > > > at the
> > > > >> > > FLIP page [1] to get more details. Any suggestions and
> comments
> > > > would be
> > > > >> > > appreciated!
> > > > >> > > >
> > > > >> > > > [1]
> > > > >> > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > > >> > > >
> > > > >> > > > Best regards,
> > > > >> > > >
> > > > >> > > > Qingsheng
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best Regards,
> > > > >
> > > > > Qingsheng Ren
> > > > >
> > > > > Real-time Computing Team
> > > > > Alibaba Cloud
> > > > >
> > > > > Email: renqs...@gmail.com
> > > >
> > >
> >
>


-- 
Best regards,
Roman Boyko
e.: ro.v.bo...@gmail.com

Reply via email to