Hi everyone,

One question regarding "And Alexander correctly mentioned that filter
pushdown still is not implemented for jdbc/hive/hbase." -> Would an
alternative solution be to actually implement these filter pushdowns? I can
imagine that there are many more benefits to doing that, outside of lookup
caching and metrics.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Thu, 5 May 2022 at 13:58, Roman Boyko <ro.v.bo...@gmail.com> wrote:

> Hi everyone!
>
> Thanks for driving such a valuable improvement!
>
> I do think that single cache implementation would be a nice opportunity for
> users. And it will break the "FOR SYSTEM_TIME AS OF proc_time" semantics
> anyway - doesn't matter how it will be implemented.
>
> Putting myself in the user's shoes, I can say that:
> 1) I would prefer to have the opportunity to cut off the cache size by
> simply filtering unnecessary data. And the most handy way to do it is apply
> it inside LookupRunners. It would be a bit harder to pass it through the
> LookupJoin node to TableFunction. And Alexander correctly mentioned that
> filter pushdown still is not implemented for jdbc/hive/hbase.
> 2) The ability to set the different caching parameters for different tables
> is quite important. So I would prefer to set it through DDL rather than
> have similar ttla, strategy and other options for all lookup tables.
> 3) Providing the cache into the framework really deprives us of
> extensibility (users won't be able to implement their own cache). But most
> probably it might be solved by creating more different cache strategies and
> a wider set of configurations.
>
> All these points are much closer to the schema proposed by Alexander.
> Qingshen Ren, please correct me if I'm not right and all these facilities
> might be simply implemented in your architecture?
>
> Best regards,
> Roman Boyko
> e.: ro.v.bo...@gmail.com
>
> On Wed, 4 May 2022 at 21:01, Martijn Visser <martijnvis...@apache.org>
> wrote:
>
> > Hi everyone,
> >
> > I don't have much to chip in, but just wanted to express that I really
> > appreciate the in-depth discussion on this topic and I hope that others
> > will join the conversation.
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 3 May 2022 at 10:15, Александр Смирнов <smirale...@gmail.com>
> > wrote:
> >
> > > Hi Qingsheng, Leonard and Jark,
> > >
> > > Thanks for your detailed feedback! However, I have questions about
> > > some of your statements (maybe I didn't get something?).
> > >
> > > > Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
> > proc_time”
> > >
> > > I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time" is not
> > > fully implemented with caching, but as you said, users go on it
> > > consciously to achieve better performance (no one proposed to enable
> > > caching by default, etc.). Or by users do you mean other developers of
> > > connectors? In this case developers explicitly specify whether their
> > > connector supports caching or not (in the list of supported options),
> > > no one makes them do that if they don't want to. So what exactly is
> > > the difference between implementing caching in modules
> > > flink-table-runtime and in flink-table-common from the considered
> > > point of view? How does it affect on breaking/non-breaking the
> > > semantics of "FOR SYSTEM_TIME AS OF proc_time"?
> > >
> > > > confront a situation that allows table options in DDL to control the
> > > behavior of the framework, which has never happened previously and
> should
> > > be cautious
> > >
> > > If we talk about main differences of semantics of DDL options and
> > > config options("table.exec.xxx"), isn't it about limiting the scope of
> > > the options + importance for the user business logic rather than
> > > specific location of corresponding logic in the framework? I mean that
> > > in my design, for example, putting an option with lookup cache
> > > strategy in configurations would  be the wrong decision, because it
> > > directly affects the user's business logic (not just performance
> > > optimization) + touches just several functions of ONE table (there can
> > > be multiple tables with different caches). Does it really matter for
> > > the user (or someone else) where the logic is located, which is
> > > affected by the applied option?
> > > Also I can remember DDL option 'sink.parallelism', which in some way
> > > "controls the behavior of the framework" and I don't see any problem
> > > here.
> > >
> > > > introduce a new interface for this all-caching scenario and the
> design
> > > would become more complex
> > >
> > > This is a subject for a separate discussion, but actually in our
> > > internal version we solved this problem quite easily - we reused
> > > InputFormat class (so there is no need for a new API). The point is
> > > that currently all lookup connectors use InputFormat for scanning the
> > > data in batch mode: HBase, JDBC and even Hive - it uses class
> > > PartitionReader, that is actually just a wrapper around InputFormat.
> > > The advantage of this solution is the ability to reload cache data in
> > > parallel (number of threads depends on number of InputSplits, but has
> > > an upper limit). As a result cache reload time significantly reduces
> > > (as well as time of input stream blocking). I know that usually we try
> > > to avoid usage of concurrency in Flink code, but maybe this one can be
> > > an exception. BTW I don't say that it's an ideal solution, maybe there
> > > are better ones.
> > >
> > > > Providing the cache in the framework might introduce compatibility
> > issues
> > >
> > > It's possible only in cases when the developer of the connector won't
> > > properly refactor his code and will use new cache options incorrectly
> > > (i.e. explicitly provide the same options into 2 different code
> > > places). For correct behavior all he will need to do is to redirect
> > > existing options to the framework's LookupConfig (+ maybe add an alias
> > > for options, if there was different naming), everything will be
> > > transparent for users. If the developer won't do refactoring at all,
> > > nothing will be changed for the connector because of backward
> > > compatibility. Also if a developer wants to use his own cache logic,
> > > he just can refuse to pass some of the configs into the framework, and
> > > instead make his own implementation with already existing configs and
> > > metrics (but actually I think that it's a rare case).
> > >
> > > > filters and projections should be pushed all the way down to the
> table
> > > function, like what we do in the scan source
> > >
> > > It's the great purpose. But the truth is that the ONLY connector that
> > > supports filter pushdown is FileSystemTableSource
> > > (no database connector supports it currently). Also for some databases
> > > it's simply impossible to pushdown such complex filters that we have
> > > in Flink.
> > >
> > > >  only applying these optimizations to the cache seems not quite
> useful
> > >
> > > Filters can cut off an arbitrarily large amount of data from the
> > > dimension table. For a simple example, suppose in dimension table
> > > 'users'
> > > we have column 'age' with values from 20 to 40, and input stream
> > > 'clicks' that is ~uniformly distributed by age of users. If we have
> > > filter 'age > 30',
> > > there will be twice less data in cache. This means the user can
> > > increase 'lookup.cache.max-rows' by almost 2 times. It will gain a
> > > huge
> > > performance boost. Moreover, this optimization starts to really shine
> > > in 'ALL' cache, where tables without filters and projections can't fit
> > > in memory, but with them - can. This opens up additional possibilities
> > > for users. And this doesn't sound as 'not quite useful'.
> > >
> > > It would be great to hear other voices regarding this topic! Because
> > > we have quite a lot of controversial points, and I think with the help
> > > of others it will be easier for us to come to a consensus.
> > >
> > > Best regards,
> > > Smirnov Alexander
> > >
> > >
> > > пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <renqs...@gmail.com>:
> > > >
> > > > Hi Alexander and Arvid,
> > > >
> > > > Thanks for the discussion and sorry for my late response! We had an
> > > internal discussion together with Jark and Leonard and I’d like to
> > > summarize our ideas. Instead of implementing the cache logic in the
> table
> > > runtime layer or wrapping around the user-provided table function, we
> > > prefer to introduce some new APIs extending TableFunction with these
> > > concerns:
> > > >
> > > > 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
> > > proc_time”, because it couldn’t truly reflect the content of the lookup
> > > table at the moment of querying. If users choose to enable caching on
> the
> > > lookup table, they implicitly indicate that this breakage is acceptable
> > in
> > > exchange for the performance. So we prefer not to provide caching on
> the
> > > table runtime level.
> > > >
> > > > 2. If we make the cache implementation in the framework (whether in a
> > > runner or a wrapper around TableFunction), we have to confront a
> > situation
> > > that allows table options in DDL to control the behavior of the
> > framework,
> > > which has never happened previously and should be cautious. Under the
> > > current design the behavior of the framework should only be specified
> by
> > > configurations (“table.exec.xxx”), and it’s hard to apply these general
> > > configs to a specific table.
> > > >
> > > > 3. We have use cases that lookup source loads and refresh all records
> > > periodically into the memory to achieve high lookup performance (like
> > Hive
> > > connector in the community, and also widely used by our internal
> > > connectors). Wrapping the cache around the user’s TableFunction works
> > fine
> > > for LRU caches, but I think we have to introduce a new interface for
> this
> > > all-caching scenario and the design would become more complex.
> > > >
> > > > 4. Providing the cache in the framework might introduce compatibility
> > > issues to existing lookup sources like there might exist two caches
> with
> > > totally different strategies if the user incorrectly configures the
> table
> > > (one in the framework and another implemented by the lookup source).
> > > >
> > > > As for the optimization mentioned by Alexander, I think filters and
> > > projections should be pushed all the way down to the table function,
> like
> > > what we do in the scan source, instead of the runner with the cache.
> The
> > > goal of using cache is to reduce the network I/O and pressure on the
> > > external system, and only applying these optimizations to the cache
> seems
> > > not quite useful.
> > > >
> > > > I made some updates to the FLIP[1] to reflect our ideas. We prefer to
> > > keep the cache implementation as a part of TableFunction, and we could
> > > provide some helper classes (CachingTableFunction,
> > AllCachingTableFunction,
> > > CachingAsyncTableFunction) to developers and regulate metrics of the
> > cache.
> > > Also, I made a POC[2] for your reference.
> > > >
> > > > Looking forward to your ideas!
> > > >
> > > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > > [2] https://github.com/PatrickRen/flink/tree/FLIP-221
> > > >
> > > > Best regards,
> > > >
> > > > Qingsheng
> > > >
> > > > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <
> > smirale...@gmail.com>
> > > wrote:
> > > >>
> > > >> Thanks for the response, Arvid!
> > > >>
> > > >> I have few comments on your message.
> > > >>
> > > >> > but could also live with an easier solution as the first step:
> > > >>
> > > >> I think that these 2 ways are mutually exclusive (originally
> proposed
> > > >> by Qingsheng and mine), because conceptually they follow the same
> > > >> goal, but implementation details are different. If we will go one
> way,
> > > >> moving to another way in the future will mean deleting existing code
> > > >> and once again changing the API for connectors. So I think we should
> > > >> reach a consensus with the community about that and then work
> together
> > > >> on this FLIP, i.e. divide the work on tasks for different parts of
> the
> > > >> flip (for example, LRU cache unification / introducing proposed set
> of
> > > >> metrics / further work…). WDYT, Qingsheng?
> > > >>
> > > >> > as the source will only receive the requests after filter
> > > >>
> > > >> Actually if filters are applied to fields of the lookup table, we
> > > >> firstly must do requests, and only after that we can filter
> responses,
> > > >> because lookup connectors don't have filter pushdown. So if
> filtering
> > > >> is done before caching, there will be much less rows in cache.
> > > >>
> > > >> > @Alexander unfortunately, your architecture is not shared. I don't
> > > know the
> > > >>
> > > >> > solution to share images to be honest.
> > > >>
> > > >> Sorry for that, I’m a bit new to such kinds of conversations :)
> > > >> I have no write access to the confluence, so I made a Jira issue,
> > > >> where described the proposed changes in more details -
> > > >> https://issues.apache.org/jira/browse/FLINK-27411.
> > > >>
> > > >> Will happy to get more feedback!
> > > >>
> > > >> Best,
> > > >> Smirnov Alexander
> > > >>
> > > >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>:
> > > >> >
> > > >> > Hi Qingsheng,
> > > >> >
> > > >> > Thanks for driving this; the inconsistency was not satisfying for
> > me.
> > > >> >
> > > >> > I second Alexander's idea though but could also live with an
> easier
> > > >> > solution as the first step: Instead of making caching an
> > > implementation
> > > >> > detail of TableFunction X, rather devise a caching layer around X.
> > So
> > > the
> > > >> > proposal would be a CachingTableFunction that delegates to X in
> case
> > > of
> > > >> > misses and else manages the cache. Lifting it into the operator
> > model
> > > as
> > > >> > proposed would be even better but is probably unnecessary in the
> > > first step
> > > >> > for a lookup source (as the source will only receive the requests
> > > after
> > > >> > filter; applying projection may be more interesting to save
> memory).
> > > >> >
> > > >> > Another advantage is that all the changes of this FLIP would be
> > > limited to
> > > >> > options, no need for new public interfaces. Everything else
> remains
> > an
> > > >> > implementation of Table runtime. That means we can easily
> > incorporate
> > > the
> > > >> > optimization potential that Alexander pointed out later.
> > > >> >
> > > >> > @Alexander unfortunately, your architecture is not shared. I don't
> > > know the
> > > >> > solution to share images to be honest.
> > > >> >
> > > >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <
> > > smirale...@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi Qingsheng! My name is Alexander, I'm not a committer yet, but
> > I'd
> > > >> > > really like to become one. And this FLIP really interested me.
> > > >> > > Actually I have worked on a similar feature in my company’s
> Flink
> > > >> > > fork, and we would like to share our thoughts on this and make
> > code
> > > >> > > open source.
> > > >> > >
> > > >> > > I think there is a better alternative than introducing an
> abstract
> > > >> > > class for TableFunction (CachingTableFunction). As you know,
> > > >> > > TableFunction exists in the flink-table-common module, which
> > > provides
> > > >> > > only an API for working with tables – it’s very convenient for
> > > importing
> > > >> > > in connectors. In turn, CachingTableFunction contains logic for
> > > >> > > runtime execution,  so this class and everything connected with
> it
> > > >> > > should be located in another module, probably in
> > > flink-table-runtime.
> > > >> > > But this will require connectors to depend on another module,
> > which
> > > >> > > contains a lot of runtime logic, which doesn’t sound good.
> > > >> > >
> > > >> > > I suggest adding a new method ‘getLookupConfig’ to
> > LookupTableSource
> > > >> > > or LookupRuntimeProvider to allow connectors to only pass
> > > >> > > configurations to the planner, therefore they won’t depend on
> > > runtime
> > > >> > > realization. Based on these configs planner will construct a
> > lookup
> > > >> > > join operator with corresponding runtime logic (ProcessFunctions
> > in
> > > >> > > module flink-table-runtime). Architecture looks like in the
> pinned
> > > >> > > image (LookupConfig class there is actually yours CacheConfig).
> > > >> > >
> > > >> > > Classes in flink-table-planner, that will be responsible for
> this
> > –
> > > >> > > CommonPhysicalLookupJoin and his inheritors.
> > > >> > > Current classes for lookup join in  flink-table-runtime  -
> > > >> > > LookupJoinRunner, AsyncLookupJoinRunner,
> LookupJoinRunnerWithCalc,
> > > >> > > AsyncLookupJoinRunnerWithCalc.
> > > >> > >
> > > >> > > I suggest adding classes LookupJoinCachingRunner,
> > > >> > > LookupJoinCachingRunnerWithCalc, etc.
> > > >> > >
> > > >> > > And here comes another more powerful advantage of such a
> solution.
> > > If
> > > >> > > we have caching logic on a lower level, we can apply some
> > > >> > > optimizations to it. LookupJoinRunnerWithCalc was named like
> this
> > > >> > > because it uses the ‘calc’ function, which actually mostly
> > consists
> > > of
> > > >> > > filters and projections.
> > > >> > >
> > > >> > > For example, in join table A with lookup table B condition
> ‘JOIN …
> > > ON
> > > >> > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’
> ‘calc’
> > > >> > > function will contain filters A.age = B.age + 10 and B.salary >
> > > 1000.
> > > >> > >
> > > >> > > If we apply this function before storing records in cache, size
> of
> > > >> > > cache will be significantly reduced: filters = avoid storing
> > useless
> > > >> > > records in cache, projections = reduce records’ size. So the
> > initial
> > > >> > > max number of records in cache can be increased by the user.
> > > >> > >
> > > >> > > What do you think about it?
> > > >> > >
> > > >> > >
> > > >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> > > >> > > > Hi devs,
> > > >> > > >
> > > >> > > > Yuan and I would like to start a discussion about FLIP-221[1],
> > > which
> > > >> > > introduces an abstraction of lookup table cache and its standard
> > > metrics.
> > > >> > > >
> > > >> > > > Currently each lookup table source should implement their own
> > > cache to
> > > >> > > store lookup results, and there isn’t a standard of metrics for
> > > users and
> > > >> > > developers to tuning their jobs with lookup joins, which is a
> > quite
> > > common
> > > >> > > use case in Flink table / SQL.
> > > >> > > >
> > > >> > > > Therefore we propose some new APIs including cache, metrics,
> > > wrapper
> > > >> > > classes of TableFunction and new table options. Please take a
> look
> > > at the
> > > >> > > FLIP page [1] to get more details. Any suggestions and comments
> > > would be
> > > >> > > appreciated!
> > > >> > > >
> > > >> > > > [1]
> > > >> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > >> > > >
> > > >> > > > Best regards,
> > > >> > > >
> > > >> > > > Qingsheng
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >
> > > >
> > > >
> > > > --
> > > > Best Regards,
> > > >
> > > > Qingsheng Ren
> > > >
> > > > Real-time Computing Team
> > > > Alibaba Cloud
> > > >
> > > > Email: renqs...@gmail.com
> > >
> >
>

Reply via email to