Hi everyone,

I don't have much to chip in, but just wanted to express that I really
appreciate the in-depth discussion on this topic and I hope that others
will join the conversation.

Best regards,

Martijn

On Tue, 3 May 2022 at 10:15, Александр Смирнов <smirale...@gmail.com> wrote:

> Hi Qingsheng, Leonard and Jark,
>
> Thanks for your detailed feedback! However, I have questions about
> some of your statements (maybe I didn't get something?).
>
> > Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF proc_time”
>
> I agree that the semantics of "FOR SYSTEM_TIME AS OF proc_time" is not
> fully implemented with caching, but as you said, users go on it
> consciously to achieve better performance (no one proposed to enable
> caching by default, etc.). Or by users do you mean other developers of
> connectors? In this case developers explicitly specify whether their
> connector supports caching or not (in the list of supported options),
> no one makes them do that if they don't want to. So what exactly is
> the difference between implementing caching in modules
> flink-table-runtime and in flink-table-common from the considered
> point of view? How does it affect on breaking/non-breaking the
> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
>
> > confront a situation that allows table options in DDL to control the
> behavior of the framework, which has never happened previously and should
> be cautious
>
> If we talk about main differences of semantics of DDL options and
> config options("table.exec.xxx"), isn't it about limiting the scope of
> the options + importance for the user business logic rather than
> specific location of corresponding logic in the framework? I mean that
> in my design, for example, putting an option with lookup cache
> strategy in configurations would  be the wrong decision, because it
> directly affects the user's business logic (not just performance
> optimization) + touches just several functions of ONE table (there can
> be multiple tables with different caches). Does it really matter for
> the user (or someone else) where the logic is located, which is
> affected by the applied option?
> Also I can remember DDL option 'sink.parallelism', which in some way
> "controls the behavior of the framework" and I don't see any problem
> here.
>
> > introduce a new interface for this all-caching scenario and the design
> would become more complex
>
> This is a subject for a separate discussion, but actually in our
> internal version we solved this problem quite easily - we reused
> InputFormat class (so there is no need for a new API). The point is
> that currently all lookup connectors use InputFormat for scanning the
> data in batch mode: HBase, JDBC and even Hive - it uses class
> PartitionReader, that is actually just a wrapper around InputFormat.
> The advantage of this solution is the ability to reload cache data in
> parallel (number of threads depends on number of InputSplits, but has
> an upper limit). As a result cache reload time significantly reduces
> (as well as time of input stream blocking). I know that usually we try
> to avoid usage of concurrency in Flink code, but maybe this one can be
> an exception. BTW I don't say that it's an ideal solution, maybe there
> are better ones.
>
> > Providing the cache in the framework might introduce compatibility issues
>
> It's possible only in cases when the developer of the connector won't
> properly refactor his code and will use new cache options incorrectly
> (i.e. explicitly provide the same options into 2 different code
> places). For correct behavior all he will need to do is to redirect
> existing options to the framework's LookupConfig (+ maybe add an alias
> for options, if there was different naming), everything will be
> transparent for users. If the developer won't do refactoring at all,
> nothing will be changed for the connector because of backward
> compatibility. Also if a developer wants to use his own cache logic,
> he just can refuse to pass some of the configs into the framework, and
> instead make his own implementation with already existing configs and
> metrics (but actually I think that it's a rare case).
>
> > filters and projections should be pushed all the way down to the table
> function, like what we do in the scan source
>
> It's the great purpose. But the truth is that the ONLY connector that
> supports filter pushdown is FileSystemTableSource
> (no database connector supports it currently). Also for some databases
> it's simply impossible to pushdown such complex filters that we have
> in Flink.
>
> >  only applying these optimizations to the cache seems not quite useful
>
> Filters can cut off an arbitrarily large amount of data from the
> dimension table. For a simple example, suppose in dimension table
> 'users'
> we have column 'age' with values from 20 to 40, and input stream
> 'clicks' that is ~uniformly distributed by age of users. If we have
> filter 'age > 30',
> there will be twice less data in cache. This means the user can
> increase 'lookup.cache.max-rows' by almost 2 times. It will gain a
> huge
> performance boost. Moreover, this optimization starts to really shine
> in 'ALL' cache, where tables without filters and projections can't fit
> in memory, but with them - can. This opens up additional possibilities
> for users. And this doesn't sound as 'not quite useful'.
>
> It would be great to hear other voices regarding this topic! Because
> we have quite a lot of controversial points, and I think with the help
> of others it will be easier for us to come to a consensus.
>
> Best regards,
> Smirnov Alexander
>
>
> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <renqs...@gmail.com>:
> >
> > Hi Alexander and Arvid,
> >
> > Thanks for the discussion and sorry for my late response! We had an
> internal discussion together with Jark and Leonard and I’d like to
> summarize our ideas. Instead of implementing the cache logic in the table
> runtime layer or wrapping around the user-provided table function, we
> prefer to introduce some new APIs extending TableFunction with these
> concerns:
> >
> > 1. Caching actually breaks the semantic of "FOR SYSTEM_TIME AS OF
> proc_time”, because it couldn’t truly reflect the content of the lookup
> table at the moment of querying. If users choose to enable caching on the
> lookup table, they implicitly indicate that this breakage is acceptable in
> exchange for the performance. So we prefer not to provide caching on the
> table runtime level.
> >
> > 2. If we make the cache implementation in the framework (whether in a
> runner or a wrapper around TableFunction), we have to confront a situation
> that allows table options in DDL to control the behavior of the framework,
> which has never happened previously and should be cautious. Under the
> current design the behavior of the framework should only be specified by
> configurations (“table.exec.xxx”), and it’s hard to apply these general
> configs to a specific table.
> >
> > 3. We have use cases that lookup source loads and refresh all records
> periodically into the memory to achieve high lookup performance (like Hive
> connector in the community, and also widely used by our internal
> connectors). Wrapping the cache around the user’s TableFunction works fine
> for LRU caches, but I think we have to introduce a new interface for this
> all-caching scenario and the design would become more complex.
> >
> > 4. Providing the cache in the framework might introduce compatibility
> issues to existing lookup sources like there might exist two caches with
> totally different strategies if the user incorrectly configures the table
> (one in the framework and another implemented by the lookup source).
> >
> > As for the optimization mentioned by Alexander, I think filters and
> projections should be pushed all the way down to the table function, like
> what we do in the scan source, instead of the runner with the cache. The
> goal of using cache is to reduce the network I/O and pressure on the
> external system, and only applying these optimizations to the cache seems
> not quite useful.
> >
> > I made some updates to the FLIP[1] to reflect our ideas. We prefer to
> keep the cache implementation as a part of TableFunction, and we could
> provide some helper classes (CachingTableFunction, AllCachingTableFunction,
> CachingAsyncTableFunction) to developers and regulate metrics of the cache.
> Also, I made a POC[2] for your reference.
> >
> > Looking forward to your ideas!
> >
> > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > [2] https://github.com/PatrickRen/flink/tree/FLIP-221
> >
> > Best regards,
> >
> > Qingsheng
> >
> > On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов <smirale...@gmail.com>
> wrote:
> >>
> >> Thanks for the response, Arvid!
> >>
> >> I have few comments on your message.
> >>
> >> > but could also live with an easier solution as the first step:
> >>
> >> I think that these 2 ways are mutually exclusive (originally proposed
> >> by Qingsheng and mine), because conceptually they follow the same
> >> goal, but implementation details are different. If we will go one way,
> >> moving to another way in the future will mean deleting existing code
> >> and once again changing the API for connectors. So I think we should
> >> reach a consensus with the community about that and then work together
> >> on this FLIP, i.e. divide the work on tasks for different parts of the
> >> flip (for example, LRU cache unification / introducing proposed set of
> >> metrics / further work…). WDYT, Qingsheng?
> >>
> >> > as the source will only receive the requests after filter
> >>
> >> Actually if filters are applied to fields of the lookup table, we
> >> firstly must do requests, and only after that we can filter responses,
> >> because lookup connectors don't have filter pushdown. So if filtering
> >> is done before caching, there will be much less rows in cache.
> >>
> >> > @Alexander unfortunately, your architecture is not shared. I don't
> know the
> >>
> >> > solution to share images to be honest.
> >>
> >> Sorry for that, I’m a bit new to such kinds of conversations :)
> >> I have no write access to the confluence, so I made a Jira issue,
> >> where described the proposed changes in more details -
> >> https://issues.apache.org/jira/browse/FLINK-27411.
> >>
> >> Will happy to get more feedback!
> >>
> >> Best,
> >> Smirnov Alexander
> >>
> >> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <ar...@apache.org>:
> >> >
> >> > Hi Qingsheng,
> >> >
> >> > Thanks for driving this; the inconsistency was not satisfying for me.
> >> >
> >> > I second Alexander's idea though but could also live with an easier
> >> > solution as the first step: Instead of making caching an
> implementation
> >> > detail of TableFunction X, rather devise a caching layer around X. So
> the
> >> > proposal would be a CachingTableFunction that delegates to X in case
> of
> >> > misses and else manages the cache. Lifting it into the operator model
> as
> >> > proposed would be even better but is probably unnecessary in the
> first step
> >> > for a lookup source (as the source will only receive the requests
> after
> >> > filter; applying projection may be more interesting to save memory).
> >> >
> >> > Another advantage is that all the changes of this FLIP would be
> limited to
> >> > options, no need for new public interfaces. Everything else remains an
> >> > implementation of Table runtime. That means we can easily incorporate
> the
> >> > optimization potential that Alexander pointed out later.
> >> >
> >> > @Alexander unfortunately, your architecture is not shared. I don't
> know the
> >> > solution to share images to be honest.
> >> >
> >> > On Fri, Apr 22, 2022 at 5:04 PM Александр Смирнов <
> smirale...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Qingsheng! My name is Alexander, I'm not a committer yet, but I'd
> >> > > really like to become one. And this FLIP really interested me.
> >> > > Actually I have worked on a similar feature in my company’s Flink
> >> > > fork, and we would like to share our thoughts on this and make code
> >> > > open source.
> >> > >
> >> > > I think there is a better alternative than introducing an abstract
> >> > > class for TableFunction (CachingTableFunction). As you know,
> >> > > TableFunction exists in the flink-table-common module, which
> provides
> >> > > only an API for working with tables – it’s very convenient for
> importing
> >> > > in connectors. In turn, CachingTableFunction contains logic for
> >> > > runtime execution,  so this class and everything connected with it
> >> > > should be located in another module, probably in
> flink-table-runtime.
> >> > > But this will require connectors to depend on another module, which
> >> > > contains a lot of runtime logic, which doesn’t sound good.
> >> > >
> >> > > I suggest adding a new method ‘getLookupConfig’ to LookupTableSource
> >> > > or LookupRuntimeProvider to allow connectors to only pass
> >> > > configurations to the planner, therefore they won’t depend on
> runtime
> >> > > realization. Based on these configs planner will construct a lookup
> >> > > join operator with corresponding runtime logic (ProcessFunctions in
> >> > > module flink-table-runtime). Architecture looks like in the pinned
> >> > > image (LookupConfig class there is actually yours CacheConfig).
> >> > >
> >> > > Classes in flink-table-planner, that will be responsible for this –
> >> > > CommonPhysicalLookupJoin and his inheritors.
> >> > > Current classes for lookup join in  flink-table-runtime  -
> >> > > LookupJoinRunner, AsyncLookupJoinRunner, LookupJoinRunnerWithCalc,
> >> > > AsyncLookupJoinRunnerWithCalc.
> >> > >
> >> > > I suggest adding classes LookupJoinCachingRunner,
> >> > > LookupJoinCachingRunnerWithCalc, etc.
> >> > >
> >> > > And here comes another more powerful advantage of such a solution.
> If
> >> > > we have caching logic on a lower level, we can apply some
> >> > > optimizations to it. LookupJoinRunnerWithCalc was named like this
> >> > > because it uses the ‘calc’ function, which actually mostly consists
> of
> >> > > filters and projections.
> >> > >
> >> > > For example, in join table A with lookup table B condition ‘JOIN …
> ON
> >> > > A.id = B.id AND A.age = B.age + 10 WHERE B.salary > 1000’  ‘calc’
> >> > > function will contain filters A.age = B.age + 10 and B.salary >
> 1000.
> >> > >
> >> > > If we apply this function before storing records in cache, size of
> >> > > cache will be significantly reduced: filters = avoid storing useless
> >> > > records in cache, projections = reduce records’ size. So the initial
> >> > > max number of records in cache can be increased by the user.
> >> > >
> >> > > What do you think about it?
> >> > >
> >> > >
> >> > > On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> >> > > > Hi devs,
> >> > > >
> >> > > > Yuan and I would like to start a discussion about FLIP-221[1],
> which
> >> > > introduces an abstraction of lookup table cache and its standard
> metrics.
> >> > > >
> >> > > > Currently each lookup table source should implement their own
> cache to
> >> > > store lookup results, and there isn’t a standard of metrics for
> users and
> >> > > developers to tuning their jobs with lookup joins, which is a quite
> common
> >> > > use case in Flink table / SQL.
> >> > > >
> >> > > > Therefore we propose some new APIs including cache, metrics,
> wrapper
> >> > > classes of TableFunction and new table options. Please take a look
> at the
> >> > > FLIP page [1] to get more details. Any suggestions and comments
> would be
> >> > > appreciated!
> >> > > >
> >> > > > [1]
> >> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> > > >
> >> > > > Best regards,
> >> > > >
> >> > > > Qingsheng
> >> > > >
> >> > > >
> >> > >
> >
> >
> >
> > --
> > Best Regards,
> >
> > Qingsheng Ren
> >
> > Real-time Computing Team
> > Alibaba Cloud
> >
> > Email: renqs...@gmail.com
>

Reply via email to