Thanks Qingsheng and all for your discussion.

Very sorry to jump in so late.

Maybe I missed something?
My first impression when I saw the cache interface was, why don't we
provide an interface similar to guava cache [1], on top of guava cache,
caffeine also makes extensions for asynchronous calls.[2]
There is also the bulk load in caffeine too.

I am also more confused why first from LookupCacheFactory.Builder and then
to Factory to create Cache.

[1] https://github.com/google/guava
[2] https://github.com/ben-manes/caffeine/wiki/Population

Best,
Jingsong

On Thu, May 26, 2022 at 11:17 PM Jark Wu <imj...@gmail.com> wrote:

> After looking at the new introduced ReloadTime and Becket's comment,
> I agree with Becket we should have a pluggable reloading strategy.
> We can provide some common implementations, e.g., periodic reloading, and
> daily reloading.
> But there definitely be some connector- or business-specific reloading
> strategies, e.g.
> notify by a zookeeper watcher, reload once a new Hive partition is
> complete.
>
> Best,
> Jark
>
> On Thu, 26 May 2022 at 11:52, Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Qingsheng,
> >
> > Thanks for updating the FLIP. A few comments / questions below:
> >
> > 1. Is there a reason that we have both "XXXFactory" and "XXXProvider".
> > What is the difference between them? If they are the same, can we just
> use
> > XXXFactory everywhere?
> >
> > 2. Regarding the FullCachingLookupProvider, should the reloading policy
> > also be pluggable? Periodical reloading could be sometimes be tricky in
> > practice. For example, if user uses 24 hours as the cache refresh
> interval
> > and some nightly batch job delayed, the cache update may still see the
> > stale data.
> >
> > 3. In DefaultLookupCacheFactory, it looks like InitialCapacity should be
> > removed.
> >
> > 4. The purpose of LookupFunctionProvider#cacheMissingKey() seems a little
> > confusing to me. If Optional<LookupCacheFactory> getCacheFactory()
> returns
> > a non-empty factory, doesn't that already indicates the framework to
> cache
> > the missing keys? Also, why is this method returning an Optional<Boolean>
> > instead of boolean?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Wed, May 25, 2022 at 5:07 PM Qingsheng Ren <renqs...@gmail.com>
> wrote:
> >
> >> Hi Lincoln and Jark,
> >>
> >> Thanks for the comments! If the community reaches a consensus that we
> use
> >> SQL hint instead of table options to decide whether to use sync or async
> >> mode, it’s indeed not necessary to introduce the “lookup.async” option.
> >>
> >> I think it’s a good idea to let the decision of async made on query
> >> level, which could make better optimization with more infomation
> gathered
> >> by planner. Is there any FLIP describing the issue in FLINK-27625? I
> >> thought FLIP-234 is only proposing adding SQL hint for retry on missing
> >> instead of the entire async mode to be controlled by hint.
> >>
> >> Best regards,
> >>
> >> Qingsheng
> >>
> >> > On May 25, 2022, at 15:13, Lincoln Lee <lincoln.8...@gmail.com>
> wrote:
> >> >
> >> > Hi Jark,
> >> >
> >> > Thanks for your reply!
> >> >
> >> > Currently 'lookup.async' just lies in HBase connector, I have no idea
> >> > whether or when to remove it (we can discuss it in another issue for
> the
> >> > HBase connector after FLINK-27625 is done), just not add it into a
> >> common
> >> > option now.
> >> >
> >> > Best,
> >> > Lincoln Lee
> >> >
> >> >
> >> > Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道:
> >> >
> >> >> Hi Lincoln,
> >> >>
> >> >> I have taken a look at FLIP-234, and I agree with you that the
> >> connectors
> >> >> can
> >> >> provide both async and sync runtime providers simultaneously instead
> >> of one
> >> >> of them.
> >> >> At that point, "lookup.async" looks redundant. If this option is
> >> planned to
> >> >> be removed
> >> >> in the long term, I think it makes sense not to introduce it in this
> >> FLIP.
> >> >>
> >> >> Best,
> >> >> Jark
> >> >>
> >> >> On Tue, 24 May 2022 at 11:08, Lincoln Lee <lincoln.8...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Hi Qingsheng,
> >> >>>
> >> >>> Sorry for jumping into the discussion so late. It's a good idea that
> >> we
> >> >> can
> >> >>> have a common table option. I have a minor comments on
> 'lookup.async'
> >> >> that
> >> >>> not make it a common option:
> >> >>>
> >> >>> The table layer abstracts both sync and async lookup capabilities,
> >> >>> connectors implementers can choose one or both, in the case of
> >> >> implementing
> >> >>> only one capability(status of the most of existing builtin
> connectors)
> >> >>> 'lookup.async' will not be used.  And when a connector has both
> >> >>> capabilities, I think this choice is more suitable for making
> >> decisions
> >> >> at
> >> >>> the query level, for example, table planner can choose the physical
> >> >>> implementation of async lookup or sync lookup based on its cost
> >> model, or
> >> >>> users can give query hint based on their own better understanding.
> If
> >> >>> there is another common table option 'lookup.async', it may confuse
> >> the
> >> >>> users in the long run.
> >> >>>
> >> >>> So, I prefer to leave the 'lookup.async' option in private place
> (for
> >> the
> >> >>> current hbase connector) and not turn it into a common option.
> >> >>>
> >> >>> WDYT?
> >> >>>
> >> >>> Best,
> >> >>> Lincoln Lee
> >> >>>
> >> >>>
> >> >>> Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道:
> >> >>>
> >> >>>> Hi Alexander,
> >> >>>>
> >> >>>> Thanks for the review! We recently updated the FLIP and you can
> find
> >> >>> those
> >> >>>> changes from my latest email. Since some terminologies has changed
> so
> >> >>> I’ll
> >> >>>> use the new concept for replying your comments.
> >> >>>>
> >> >>>> 1. Builder vs ‘of’
> >> >>>> I’m OK to use builder pattern if we have additional optional
> >> parameters
> >> >>>> for full caching mode (“rescan” previously). The
> schedule-with-delay
> >> >> idea
> >> >>>> looks reasonable to me, but I think we need to redesign the builder
> >> API
> >> >>> of
> >> >>>> full caching to make it more descriptive for developers. Would you
> >> mind
> >> >>>> sharing your ideas about the API? For accessing the FLIP workspace
> >> you
> >> >>> can
> >> >>>> just provide your account ID and ping any PMC member including
> Jark.
> >> >>>>
> >> >>>> 2. Common table options
> >> >>>> We have some discussions these days and propose to introduce 8
> common
> >> >>>> table options about caching. It has been updated on the FLIP.
> >> >>>>
> >> >>>> 3. Retries
> >> >>>> I think we are on the same page :-)
> >> >>>>
> >> >>>> For your additional concerns:
> >> >>>> 1) The table option has been updated.
> >> >>>> 2) We got “lookup.cache” back for configuring whether to use
> partial
> >> or
> >> >>>> full caching mode.
> >> >>>>
> >> >>>> Best regards,
> >> >>>>
> >> >>>> Qingsheng
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>>> On May 19, 2022, at 17:25, Александр Смирнов <
> smirale...@gmail.com>
> >> >>>> wrote:
> >> >>>>>
> >> >>>>> Also I have a few additions:
> >> >>>>> 1) maybe rename 'lookup.cache.maximum-size' to
> >> >>>>> 'lookup.cache.max-rows'? I think it will be more clear that we
> talk
> >> >>>>> not about bytes, but about the number of rows. Plus it fits more,
> >> >>>>> considering my optimization with filters.
> >> >>>>> 2) How will users enable rescanning? Are we going to separate
> >> caching
> >> >>>>> and rescanning from the options point of view? Like initially we
> had
> >> >>>>> one option 'lookup.cache' with values LRU / ALL. I think now we
> can
> >> >>>>> make a boolean option 'lookup.rescan'. RescanInterval can be
> >> >>>>> 'lookup.rescan.interval', etc.
> >> >>>>>
> >> >>>>> Best regards,
> >> >>>>> Alexander
> >> >>>>>
> >> >>>>> чт, 19 мая 2022 г. в 14:50, Александр Смирнов <
> smirale...@gmail.com
> >> >>> :
> >> >>>>>>
> >> >>>>>> Hi Qingsheng and Jark,
> >> >>>>>>
> >> >>>>>> 1. Builders vs 'of'
> >> >>>>>> I understand that builders are used when we have multiple
> >> >> parameters.
> >> >>>>>> I suggested them because we could add parameters later. To
> prevent
> >> >>>>>> Builder for ScanRuntimeProvider from looking redundant I can
> >> suggest
> >> >>>>>> one more config now - "rescanStartTime".
> >> >>>>>> It's a time in UTC (LocalTime class) when the first reload of
> cache
> >> >>>>>> starts. This parameter can be thought of as 'initialDelay' (diff
> >> >>>>>> between current time and rescanStartTime) in method
> >> >>>>>> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be
> very
> >> >>>>>> useful when the dimension table is updated by some other
> scheduled
> >> >> job
> >> >>>>>> at a certain time. Or when the user simply wants a second scan
> >> >> (first
> >> >>>>>> cache reload) be delayed. This option can be used even without
> >> >>>>>> 'rescanInterval' - in this case 'rescanInterval' will be one day.
> >> >>>>>> If you are fine with this option, I would be very glad if you
> would
> >> >>>>>> give me access to edit FLIP page, so I could add it myself
> >> >>>>>>
> >> >>>>>> 2. Common table options
> >> >>>>>> I also think that FactoryUtil would be overloaded by all cache
> >> >>>>>> options. But maybe unify all suggested options, not only for
> >> default
> >> >>>>>> cache? I.e. class 'LookupOptions', that unifies default cache
> >> >> options,
> >> >>>>>> rescan options, 'async', 'maxRetries'. WDYT?
> >> >>>>>>
> >> >>>>>> 3. Retries
> >> >>>>>> I'm fine with suggestion close to RetryUtils#tryTimes(times,
> call)
> >> >>>>>>
> >> >>>>>> [1]
> >> >>>>
> >> >>>
> >> >>
> >>
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
> >> >>>>>>
> >> >>>>>> Best regards,
> >> >>>>>> Alexander
> >> >>>>>>
> >> >>>>>> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <renqs...@gmail.com>:
> >> >>>>>>>
> >> >>>>>>> Hi Jark and Alexander,
> >> >>>>>>>
> >> >>>>>>> Thanks for your comments! I’m also OK to introduce common table
> >> >>>> options. I prefer to introduce a new DefaultLookupCacheOptions
> class
> >> >> for
> >> >>>> holding these option definitions because putting all options into
> >> >>>> FactoryUtil would make it a bit ”crowded” and not well categorized.
> >> >>>>>>>
> >> >>>>>>> FLIP has been updated according to suggestions above:
> >> >>>>>>> 1. Use static “of” method for constructing RescanRuntimeProvider
> >> >>>> considering both arguments are required.
> >> >>>>>>> 2. Introduce new table options matching
> DefaultLookupCacheFactory
> >> >>>>>>>
> >> >>>>>>> Best,
> >> >>>>>>> Qingsheng
> >> >>>>>>>
> >> >>>>>>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com>
> wrote:
> >> >>>>>>>>
> >> >>>>>>>> Hi Alex,
> >> >>>>>>>>
> >> >>>>>>>> 1) retry logic
> >> >>>>>>>> I think we can extract some common retry logic into utilities,
> >> >> e.g.
> >> >>>> RetryUtils#tryTimes(times, call).
> >> >>>>>>>> This seems independent of this FLIP and can be reused by
> >> >> DataStream
> >> >>>> users.
> >> >>>>>>>> Maybe we can open an issue to discuss this and where to put it.
> >> >>>>>>>>
> >> >>>>>>>> 2) cache ConfigOptions
> >> >>>>>>>> I'm fine with defining cache config options in the framework.
> >> >>>>>>>> A candidate place to put is FactoryUtil which also includes
> >> >>>> "sink.parallelism", "format" options.
> >> >>>>>>>>
> >> >>>>>>>> Best,
> >> >>>>>>>> Jark
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов <
> >> >>> smirale...@gmail.com>
> >> >>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>> Hi Qingsheng,
> >> >>>>>>>>>
> >> >>>>>>>>> Thank you for considering my comments.
> >> >>>>>>>>>
> >> >>>>>>>>>> there might be custom logic before making retry, such as
> >> >>>> re-establish the connection
> >> >>>>>>>>>
> >> >>>>>>>>> Yes, I understand that. I meant that such logic can be placed
> in
> >> >> a
> >> >>>>>>>>> separate function, that can be implemented by connectors. Just
> >> >>> moving
> >> >>>>>>>>> the retry logic would make connector's LookupFunction more
> >> >> concise
> >> >>> +
> >> >>>>>>>>> avoid duplicate code. However, it's a minor change. The
> decision
> >> >> is
> >> >>>> up
> >> >>>>>>>>> to you.
> >> >>>>>>>>>
> >> >>>>>>>>>> We decide not to provide common DDL options and let
> developers
> >> >> to
> >> >>>> define their own options as we do now per connector.
> >> >>>>>>>>>
> >> >>>>>>>>> What is the reason for that? One of the main goals of this
> FLIP
> >> >> was
> >> >>>> to
> >> >>>>>>>>> unify the configs, wasn't it? I understand that current cache
> >> >>> design
> >> >>>>>>>>> doesn't depend on ConfigOptions, like was before. But still we
> >> >> can
> >> >>>> put
> >> >>>>>>>>> these options into the framework, so connectors can reuse them
> >> >> and
> >> >>>>>>>>> avoid code duplication, and, what is more significant, avoid
> >> >>> possible
> >> >>>>>>>>> different options naming. This moment can be pointed out in
> >> >>>>>>>>> documentation for connector developers.
> >> >>>>>>>>>
> >> >>>>>>>>> Best regards,
> >> >>>>>>>>> Alexander
> >> >>>>>>>>>
> >> >>>>>>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com
> >:
> >> >>>>>>>>>>
> >> >>>>>>>>>> Hi Alexander,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Thanks for the review and glad to see we are on the same
> page!
> >> I
> >> >>>> think you forgot to cc the dev mailing list so I’m also quoting
> your
> >> >>> reply
> >> >>>> under this email.
> >> >>>>>>>>>>
> >> >>>>>>>>>>> We can add 'maxRetryTimes' option into this class
> >> >>>>>>>>>>
> >> >>>>>>>>>> In my opinion the retry logic should be implemented in
> lookup()
> >> >>>> instead of in LookupFunction#eval(). Retrying is only meaningful
> >> under
> >> >>> some
> >> >>>> specific retriable failures, and there might be custom logic before
> >> >>> making
> >> >>>> retry, such as re-establish the connection
> (JdbcRowDataLookupFunction
> >> >> is
> >> >>> an
> >> >>>> example), so it's more handy to leave it to the connector.
> >> >>>>>>>>>>
> >> >>>>>>>>>>> I don't see DDL options, that were in previous version of
> >> FLIP.
> >> >>> Do
> >> >>>> you have any special plans for them?
> >> >>>>>>>>>>
> >> >>>>>>>>>> We decide not to provide common DDL options and let
> developers
> >> >> to
> >> >>>> define their own options as we do now per connector.
> >> >>>>>>>>>>
> >> >>>>>>>>>> The rest of comments sound great and I’ll update the FLIP.
> Hope
> >> >> we
> >> >>>> can finalize our proposal soon!
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>>
> >> >>>>>>>>>> Qingsheng
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>> On May 17, 2022, at 13:46, Александр Смирнов <
> >> >>> smirale...@gmail.com>
> >> >>>> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Hi Qingsheng and devs!
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I like the overall design of updated FLIP, however I have
> >> >> several
> >> >>>>>>>>>>> suggestions and questions.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> 1) Introducing LookupFunction as a subclass of TableFunction
> >> >> is a
> >> >>>> good
> >> >>>>>>>>>>> idea. We can add 'maxRetryTimes' option into this class.
> >> 'eval'
> >> >>>> method
> >> >>>>>>>>>>> of new LookupFunction is great for this purpose. The same is
> >> >> for
> >> >>>>>>>>>>> 'async' case.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> 2) There might be other configs in future, such as
> >> >>>> 'cacheMissingKey'
> >> >>>>>>>>>>> in LookupFunctionProvider or 'rescanInterval' in
> >> >>>> ScanRuntimeProvider.
> >> >>>>>>>>>>> Maybe use Builder pattern in LookupFunctionProvider and
> >> >>>>>>>>>>> RescanRuntimeProvider for more flexibility (use one 'build'
> >> >>> method
> >> >>>>>>>>>>> instead of many 'of' methods in future)?
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> 3) What are the plans for existing TableFunctionProvider and
> >> >>>>>>>>>>> AsyncTableFunctionProvider? I think they should be
> deprecated.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> 4) Am I right that the current design does not assume usage
> of
> >> >>>>>>>>>>> user-provided LookupCache in re-scanning? In this case, it
> is
> >> >> not
> >> >>>> very
> >> >>>>>>>>>>> clear why do we need methods such as 'invalidate' or
> 'putAll'
> >> >> in
> >> >>>>>>>>>>> LookupCache.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> 5) I don't see DDL options, that were in previous version of
> >> >>> FLIP.
> >> >>>> Do
> >> >>>>>>>>>>> you have any special plans for them?
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> If you don't mind, I would be glad to be able to make small
> >> >>>>>>>>>>> adjustments to the FLIP document too. I think it's worth
> >> >>> mentioning
> >> >>>>>>>>>>> about what exactly optimizations are planning in the future.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Best regards,
> >> >>>>>>>>>>> Smirnov Alexander
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <
> renqs...@gmail.com
> >> >>> :
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Hi Alexander and devs,
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Thank you very much for the in-depth discussion! As Jark
> >> >>>> mentioned we were inspired by Alexander's idea and made a refactor
> on
> >> >> our
> >> >>>> design. FLIP-221 [1] has been updated to reflect our design now and
> >> we
> >> >>> are
> >> >>>> happy to hear more suggestions from you!
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Compared to the previous design:
> >> >>>>>>>>>>>> 1. The lookup cache serves at table runtime level and is
> >> >>>> integrated as a component of LookupJoinRunner as discussed
> >> previously.
> >> >>>>>>>>>>>> 2. Interfaces are renamed and re-designed to reflect the
> new
> >> >>>> design.
> >> >>>>>>>>>>>> 3. We separate the all-caching case individually and
> >> >> introduce a
> >> >>>> new RescanRuntimeProvider to reuse the ability of scanning. We are
> >> >>> planning
> >> >>>> to support SourceFunction / InputFormat for now considering the
> >> >>> complexity
> >> >>>> of FLIP-27 Source API.
> >> >>>>>>>>>>>> 4. A new interface LookupFunction is introduced to make the
> >> >>>> semantic of lookup more straightforward for developers.
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> For replying to Alexander:
> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is
> >> >> deprecated
> >> >>>> or not. Am I right that it will be so in the future, but currently
> >> it's
> >> >>> not?
> >> >>>>>>>>>>>> Yes you are right. InputFormat is not deprecated for now. I
> >> >>> think
> >> >>>> it will be deprecated in the future but we don't have a clear plan
> >> for
> >> >>> that.
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Thanks again for the discussion on this FLIP and looking
> >> >> forward
> >> >>>> to cooperating with you after we finalize the design and
> interfaces!
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> [1]
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Qingsheng
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <
> >> >>>> smirale...@gmail.com> wrote:
> >> >>>>>>>>>>>>>
> >> >>>>>>>>>>>>> Hi Jark, Qingsheng and Leonard!
> >> >>>>>>>>>>>>>
> >> >>>>>>>>>>>>> Glad to see that we came to a consensus on almost all
> >> points!
> >> >>>>>>>>>>>>>
> >> >>>>>>>>>>>>> However I'm a little confused whether InputFormat is
> >> >> deprecated
> >> >>>> or
> >> >>>>>>>>>>>>> not. Am I right that it will be so in the future, but
> >> >> currently
> >> >>>> it's
> >> >>>>>>>>>>>>> not? Actually I also think that for the first version it's
> >> OK
> >> >>> to
> >> >>>> use
> >> >>>>>>>>>>>>> InputFormat in ALL cache realization, because supporting
> >> >> rescan
> >> >>>>>>>>>>>>> ability seems like a very distant prospect. But for this
> >> >>>> decision we
> >> >>>>>>>>>>>>> need a consensus among all discussion participants.
> >> >>>>>>>>>>>>>
> >> >>>>>>>>>>>>> In general, I don't have something to argue with your
> >> >>>> statements. All
> >> >>>>>>>>>>>>> of them correspond my ideas. Looking ahead, it would be
> nice
> >> >> to
> >> >>>> work
> >> >>>>>>>>>>>>> on this FLIP cooperatively. I've already done a lot of
> work
> >> >> on
> >> >>>> lookup
> >> >>>>>>>>>>>>> join caching with realization very close to the one we are
> >> >>>> discussing,
> >> >>>>>>>>>>>>> and want to share the results of this work. Anyway looking
> >> >>>> forward for
> >> >>>>>>>>>>>>> the FLIP update!
> >> >>>>>>>>>>>>>
> >> >>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>> Smirnov Alexander
> >> >>>>>>>>>>>>>
> >> >>>>>>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>:
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> Hi Alex,
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> Thanks for summarizing your points.
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> In the past week, Qingsheng, Leonard, and I have
> discussed
> >> >> it
> >> >>>> several times
> >> >>>>>>>>>>>>>> and we have totally refactored the design.
> >> >>>>>>>>>>>>>> I'm glad to say we have reached a consensus on many of
> your
> >> >>>> points!
> >> >>>>>>>>>>>>>> Qingsheng is still working on updating the design docs
> and
> >> >>>> maybe can be
> >> >>>>>>>>>>>>>> available in the next few days.
> >> >>>>>>>>>>>>>> I will share some conclusions from our discussions:
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> 1) we have refactored the design towards to "cache in
> >> >>>> framework" way.
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> 2) a "LookupCache" interface for users to customize and a
> >> >>>> default
> >> >>>>>>>>>>>>>> implementation with builder for users to easy-use.
> >> >>>>>>>>>>>>>> This can both make it possible to both have flexibility
> and
> >> >>>> conciseness.
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup
> >> >> cache,
> >> >>>> esp reducing
> >> >>>>>>>>>>>>>> IO.
> >> >>>>>>>>>>>>>> Filter pushdown should be the final state and the unified
> >> >> way
> >> >>>> to both
> >> >>>>>>>>>>>>>> support pruning ALL cache and LRU cache,
> >> >>>>>>>>>>>>>> so I think we should make effort in this direction. If we
> >> >> need
> >> >>>> to support
> >> >>>>>>>>>>>>>> filter pushdown for ALL cache anyway, why not use
> >> >>>>>>>>>>>>>> it for LRU cache as well? Either way, as we decide to
> >> >>> implement
> >> >>>> the cache
> >> >>>>>>>>>>>>>> in the framework, we have the chance to support
> >> >>>>>>>>>>>>>> filter on cache anytime. This is an optimization and it
> >> >>> doesn't
> >> >>>> affect the
> >> >>>>>>>>>>>>>> public API. I think we can create a JIRA issue to
> >> >>>>>>>>>>>>>> discuss it when the FLIP is accepted.
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> 4) The idea to support ALL cache is similar to your
> >> >> proposal.
> >> >>>>>>>>>>>>>> In the first version, we will only support InputFormat,
> >> >>>> SourceFunction for
> >> >>>>>>>>>>>>>> cache all (invoke InputFormat in join operator).
> >> >>>>>>>>>>>>>> For FLIP-27 source, we need to join a true source
> operator
> >> >>>> instead of
> >> >>>>>>>>>>>>>> calling it embedded in the join operator.
> >> >>>>>>>>>>>>>> However, this needs another FLIP to support the re-scan
> >> >>> ability
> >> >>>> for FLIP-27
> >> >>>>>>>>>>>>>> Source, and this can be a large work.
> >> >>>>>>>>>>>>>> In order to not block this issue, we can put the effort
> of
> >> >>>> FLIP-27 source
> >> >>>>>>>>>>>>>> integration into future work and integrate
> >> >>>>>>>>>>>>>> InputFormat&SourceFunction for now.
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> I think it's fine to use InputFormat&SourceFunction, as
> >> they
> >> >>>> are not
> >> >>>>>>>>>>>>>> deprecated, otherwise, we have to introduce another
> >> function
> >> >>>>>>>>>>>>>> similar to them which is meaningless. We need to plan
> >> >> FLIP-27
> >> >>>> source
> >> >>>>>>>>>>>>>> integration ASAP before InputFormat & SourceFunction are
> >> >>>> deprecated.
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> Best,
> >> >>>>>>>>>>>>>> Jark
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <
> >> >>>> smirale...@gmail.com>
> >> >>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>> Hi Martijn!
> >> >>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>> Got it. Therefore, the realization with InputFormat is
> not
> >> >>>> considered.
> >> >>>>>>>>>>>>>>> Thanks for clearing that up!
> >> >>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>> Smirnov Alexander
> >> >>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <
> >> >>>> mart...@ververica.com>:
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>> Hi,
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>> With regards to:
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> But if there are plans to refactor all connectors to
> >> >>> FLIP-27
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old
> >> >>>> interfaces will be
> >> >>>>>>>>>>>>>>>> deprecated and connectors will either be refactored to
> >> use
> >> >>>> the new ones
> >> >>>>>>>>>>>>>>> or
> >> >>>>>>>>>>>>>>>> dropped.
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>> The caching should work for connectors that are using
> >> >>> FLIP-27
> >> >>>> interfaces,
> >> >>>>>>>>>>>>>>>> we should not introduce new features for old
> interfaces.
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>> Martijn
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов <
> >> >>>> smirale...@gmail.com>
> >> >>>>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> Hi Jark!
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> Sorry for the late response. I would like to make some
> >> >>>> comments and
> >> >>>>>>>>>>>>>>>>> clarify my points.
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> 1) I agree with your first statement. I think we can
> >> >>> achieve
> >> >>>> both
> >> >>>>>>>>>>>>>>>>> advantages this way: put the Cache interface in
> >> >>>> flink-table-common,
> >> >>>>>>>>>>>>>>>>> but have implementations of it in flink-table-runtime.
> >> >>>> Therefore if a
> >> >>>>>>>>>>>>>>>>> connector developer wants to use existing cache
> >> >> strategies
> >> >>>> and their
> >> >>>>>>>>>>>>>>>>> implementations, he can just pass lookupConfig to the
> >> >>>> planner, but if
> >> >>>>>>>>>>>>>>>>> he wants to have its own cache implementation in his
> >> >>>> TableFunction, it
> >> >>>>>>>>>>>>>>>>> will be possible for him to use the existing interface
> >> >> for
> >> >>>> this
> >> >>>>>>>>>>>>>>>>> purpose (we can explicitly point this out in the
> >> >>>> documentation). In
> >> >>>>>>>>>>>>>>>>> this way all configs and metrics will be unified.
> WDYT?
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we
> will
> >> >>>> have 90% of
> >> >>>>>>>>>>>>>>>>> lookup requests that can never be cached
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> 2) Let me clarify the logic filters optimization in
> case
> >> >> of
> >> >>>> LRU cache.
> >> >>>>>>>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>.
> Here
> >> >> we
> >> >>>> always
> >> >>>>>>>>>>>>>>>>> store the response of the dimension table in cache,
> even
> >> >>>> after
> >> >>>>>>>>>>>>>>>>> applying calc function. I.e. if there are no rows
> after
> >> >>>> applying
> >> >>>>>>>>>>>>>>>>> filters to the result of the 'eval' method of
> >> >>> TableFunction,
> >> >>>> we store
> >> >>>>>>>>>>>>>>>>> the empty list by lookup keys. Therefore the cache
> line
> >> >>> will
> >> >>>> be
> >> >>>>>>>>>>>>>>>>> filled, but will require much less memory (in bytes).
> >> >> I.e.
> >> >>>> we don't
> >> >>>>>>>>>>>>>>>>> completely filter keys, by which result was pruned,
> but
> >> >>>> significantly
> >> >>>>>>>>>>>>>>>>> reduce required memory to store this result. If the
> user
> >> >>>> knows about
> >> >>>>>>>>>>>>>>>>> this behavior, he can increase the 'max-rows' option
> >> >> before
> >> >>>> the start
> >> >>>>>>>>>>>>>>>>> of the job. But actually I came up with the idea that
> we
> >> >>> can
> >> >>>> do this
> >> >>>>>>>>>>>>>>>>> automatically by using the 'maximumWeight' and
> 'weigher'
> >> >>>> methods of
> >> >>>>>>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the
> collection
> >> >> of
> >> >>>> rows
> >> >>>>>>>>>>>>>>>>> (value of cache). Therefore cache can automatically
> fit
> >> >>> much
> >> >>>> more
> >> >>>>>>>>>>>>>>>>> records than before.
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> Flink SQL has provided a standard way to do filters
> and
> >> >>>> projects
> >> >>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
> >> >>>> SupportsProjectionPushDown.
> >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces,
> >> >> don't
> >> >>>> mean it's
> >> >>>>>>>>>>>>>>> hard
> >> >>>>>>>>>>>>>>>>> to implement.
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> It's debatable how difficult it will be to implement
> >> >> filter
> >> >>>> pushdown.
> >> >>>>>>>>>>>>>>>>> But I think the fact that currently there is no
> database
> >> >>>> connector
> >> >>>>>>>>>>>>>>>>> with filter pushdown at least means that this feature
> >> >> won't
> >> >>>> be
> >> >>>>>>>>>>>>>>>>> supported soon in connectors. Moreover, if we talk
> about
> >> >>>> other
> >> >>>>>>>>>>>>>>>>> connectors (not in Flink repo), their databases might
> >> not
> >> >>>> support all
> >> >>>>>>>>>>>>>>>>> Flink filters (or not support filters at all). I think
> >> >>> users
> >> >>>> are
> >> >>>>>>>>>>>>>>>>> interested in supporting cache filters optimization
> >> >>>> independently of
> >> >>>>>>>>>>>>>>>>> supporting other features and solving more complex
> >> >> problems
> >> >>>> (or
> >> >>>>>>>>>>>>>>>>> unsolvable at all).
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> 3) I agree with your third statement. Actually in our
> >> >>>> internal version
> >> >>>>>>>>>>>>>>>>> I also tried to unify the logic of scanning and
> >> reloading
> >> >>>> data from
> >> >>>>>>>>>>>>>>>>> connectors. But unfortunately, I didn't find a way to
> >> >> unify
> >> >>>> the logic
> >> >>>>>>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat,
> >> SourceFunction,
> >> >>>> Source,...)
> >> >>>>>>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I
> >> >> settled
> >> >>>> on using
> >> >>>>>>>>>>>>>>>>> InputFormat, because it was used for scanning in all
> >> >> lookup
> >> >>>>>>>>>>>>>>>>> connectors. (I didn't know that there are plans to
> >> >>> deprecate
> >> >>>>>>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of
> >> >>>> FLIP-27 source
> >> >>>>>>>>>>>>>>>>> in ALL caching is not good idea, because this source
> was
> >> >>>> designed to
> >> >>>>>>>>>>>>>>>>> work in distributed environment (SplitEnumerator on
> >> >>>> JobManager and
> >> >>>>>>>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator
> >> >> (lookup
> >> >>>> join
> >> >>>>>>>>>>>>>>>>> operator in our case). There is even no direct way to
> >> >> pass
> >> >>>> splits from
> >> >>>>>>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works
> >> through
> >> >>>>>>>>>>>>>>>>> SplitEnumeratorContext, which requires
> >> >>>>>>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send
> >> >> AddSplitEvents).
> >> >>>> Usage of
> >> >>>>>>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer and
> >> >>>> easier. But if
> >> >>>>>>>>>>>>>>>>> there are plans to refactor all connectors to
> FLIP-27, I
> >> >>>> have the
> >> >>>>>>>>>>>>>>>>> following ideas: maybe we can refuse from lookup join
> >> ALL
> >> >>>> cache in
> >> >>>>>>>>>>>>>>>>> favor of simple join with multiple scanning of batch
> >> >>> source?
> >> >>>> The point
> >> >>>>>>>>>>>>>>>>> is that the only difference between lookup join ALL
> >> cache
> >> >>>> and simple
> >> >>>>>>>>>>>>>>>>> join with batch source is that in the first case
> >> scanning
> >> >>> is
> >> >>>> performed
> >> >>>>>>>>>>>>>>>>> multiple times, in between which state (cache) is
> >> cleared
> >> >>>> (correct me
> >> >>>>>>>>>>>>>>>>> if I'm wrong). So what if we extend the functionality
> of
> >> >>>> simple join
> >> >>>>>>>>>>>>>>>>> to support state reloading + extend the functionality
> of
> >> >>>> scanning
> >> >>>>>>>>>>>>>>>>> batch source multiple times (this one should be easy
> >> with
> >> >>>> new FLIP-27
> >> >>>>>>>>>>>>>>>>> source, that unifies streaming/batch reading - we will
> >> >> need
> >> >>>> to change
> >> >>>>>>>>>>>>>>>>> only SplitEnumerator, which will pass splits again
> after
> >> >>>> some TTL).
> >> >>>>>>>>>>>>>>>>> WDYT? I must say that this looks like a long-term goal
> >> >> and
> >> >>>> will make
> >> >>>>>>>>>>>>>>>>> the scope of this FLIP even larger than you said.
> Maybe
> >> >> we
> >> >>>> can limit
> >> >>>>>>>>>>>>>>>>> ourselves to a simpler solution now (InputFormats).
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> So to sum up, my points is like this:
> >> >>>>>>>>>>>>>>>>> 1) There is a way to make both concise and flexible
> >> >>>> interfaces for
> >> >>>>>>>>>>>>>>>>> caching in lookup join.
> >> >>>>>>>>>>>>>>>>> 2) Cache filters optimization is important both in LRU
> >> >> and
> >> >>>> ALL caches.
> >> >>>>>>>>>>>>>>>>> 3) It is unclear when filter pushdown will be
> supported
> >> >> in
> >> >>>> Flink
> >> >>>>>>>>>>>>>>>>> connectors, some of the connectors might not have the
> >> >>>> opportunity to
> >> >>>>>>>>>>>>>>>>> support filter pushdown + as I know, currently filter
> >> >>>> pushdown works
> >> >>>>>>>>>>>>>>>>> only for scanning (not lookup). So cache filters +
> >> >>>> projections
> >> >>>>>>>>>>>>>>>>> optimization should be independent from other
> features.
> >> >>>>>>>>>>>>>>>>> 4) ALL cache realization is a complex topic that
> >> involves
> >> >>>> multiple
> >> >>>>>>>>>>>>>>>>> aspects of how Flink is developing. Refusing from
> >> >>>> InputFormat in favor
> >> >>>>>>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization
> really
> >> >>>> complex and
> >> >>>>>>>>>>>>>>>>> not clear, so maybe instead of that we can extend the
> >> >>>> functionality of
> >> >>>>>>>>>>>>>>>>> simple join or not refuse from InputFormat in case of
> >> >>> lookup
> >> >>>> join ALL
> >> >>>>>>>>>>>>>>>>> cache?
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>> Smirnov Alexander
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> [1]
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com
> >:
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> It's great to see the active discussion! I want to
> >> share
> >> >>> my
> >> >>>> ideas:
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> 1) implement the cache in framework vs. connectors
> base
> >> >>>>>>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways
> should
> >> >>>> work (e.g.,
> >> >>>>>>>>>>>>>>> cache
> >> >>>>>>>>>>>>>>>>>> pruning, compatibility).
> >> >>>>>>>>>>>>>>>>>> The framework way can provide more concise
> interfaces.
> >> >>>>>>>>>>>>>>>>>> The connector base way can define more flexible cache
> >> >>>>>>>>>>>>>>>>>> strategies/implementations.
> >> >>>>>>>>>>>>>>>>>> We are still investigating a way to see if we can
> have
> >> >>> both
> >> >>>>>>>>>>>>>>> advantages.
> >> >>>>>>>>>>>>>>>>>> We should reach a consensus that the way should be a
> >> >> final
> >> >>>> state,
> >> >>>>>>>>>>>>>>> and we
> >> >>>>>>>>>>>>>>>>>> are on the path to it.
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> 2) filters and projections pushdown:
> >> >>>>>>>>>>>>>>>>>> I agree with Alex that the filter pushdown into cache
> >> >> can
> >> >>>> benefit a
> >> >>>>>>>>>>>>>>> lot
> >> >>>>>>>>>>>>>>>>> for
> >> >>>>>>>>>>>>>>>>>> ALL cache.
> >> >>>>>>>>>>>>>>>>>> However, this is not true for LRU cache. Connectors
> use
> >> >>>> cache to
> >> >>>>>>>>>>>>>>> reduce
> >> >>>>>>>>>>>>>>>>> IO
> >> >>>>>>>>>>>>>>>>>> requests to databases for better throughput.
> >> >>>>>>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we
> will
> >> >>>> have 90% of
> >> >>>>>>>>>>>>>>>>> lookup
> >> >>>>>>>>>>>>>>>>>> requests that can never be cached
> >> >>>>>>>>>>>>>>>>>> and hit directly to the databases. That means the
> cache
> >> >> is
> >> >>>>>>>>>>>>>>> meaningless in
> >> >>>>>>>>>>>>>>>>>> this case.
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do
> >> filters
> >> >>>> and projects
> >> >>>>>>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
> >> >>>>>>>>>>>>>>> SupportsProjectionPushDown.
> >> >>>>>>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces,
> >> >> don't
> >> >>>> mean it's
> >> >>>>>>>>>>>>>>> hard
> >> >>>>>>>>>>>>>>>>> to
> >> >>>>>>>>>>>>>>>>>> implement.
> >> >>>>>>>>>>>>>>>>>> They should implement the pushdown interfaces to
> reduce
> >> >> IO
> >> >>>> and the
> >> >>>>>>>>>>>>>>> cache
> >> >>>>>>>>>>>>>>>>>> size.
> >> >>>>>>>>>>>>>>>>>> That should be a final state that the scan source and
> >> >>>> lookup source
> >> >>>>>>>>>>>>>>> share
> >> >>>>>>>>>>>>>>>>>> the exact pushdown implementation.
> >> >>>>>>>>>>>>>>>>>> I don't see why we need to duplicate the pushdown
> logic
> >> >> in
> >> >>>> caches,
> >> >>>>>>>>>>>>>>> which
> >> >>>>>>>>>>>>>>>>>> will complex the lookup join design.
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> 3) ALL cache abstraction
> >> >>>>>>>>>>>>>>>>>> All cache might be the most challenging part of this
> >> >> FLIP.
> >> >>>> We have
> >> >>>>>>>>>>>>>>> never
> >> >>>>>>>>>>>>>>>>>> provided a reload-lookup public interface.
> >> >>>>>>>>>>>>>>>>>> Currently, we put the reload logic in the "eval"
> method
> >> >> of
> >> >>>>>>>>>>>>>>> TableFunction.
> >> >>>>>>>>>>>>>>>>>> That's hard for some sources (e.g., Hive).
> >> >>>>>>>>>>>>>>>>>> Ideally, connector implementation should share the
> >> logic
> >> >>> of
> >> >>>> reload
> >> >>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>>> scan, i.e. ScanTableSource with
> >> >>>> InputFormat/SourceFunction/FLIP-27
> >> >>>>>>>>>>>>>>>>> Source.
> >> >>>>>>>>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated,
> and
> >> >>> the
> >> >>>> FLIP-27
> >> >>>>>>>>>>>>>>>>> source
> >> >>>>>>>>>>>>>>>>>> is deeply coupled with SourceOperator.
> >> >>>>>>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in
> LookupJoin,
> >> >>> this
> >> >>>> may make
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>> scope of this FLIP much larger.
> >> >>>>>>>>>>>>>>>>>> We are still investigating how to abstract the ALL
> >> cache
> >> >>>> logic and
> >> >>>>>>>>>>>>>>> reuse
> >> >>>>>>>>>>>>>>>>>> the existing source interfaces.
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> Best,
> >> >>>>>>>>>>>>>>>>>> Jark
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <
> >> >>>> ro.v.bo...@gmail.com>
> >> >>>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>> It's a much more complicated activity and lies out
> of
> >> >> the
> >> >>>> scope of
> >> >>>>>>>>>>>>>>> this
> >> >>>>>>>>>>>>>>>>>>> improvement. Because such pushdowns should be done
> for
> >> >>> all
> >> >>>>>>>>>>>>>>>>> ScanTableSource
> >> >>>>>>>>>>>>>>>>>>> implementations (not only for Lookup ones).
> >> >>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
> >> >>>>>>>>>>>>>>> martijnvis...@apache.org>
> >> >>>>>>>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>> Hi everyone,
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>> One question regarding "And Alexander correctly
> >> >>> mentioned
> >> >>>> that
> >> >>>>>>>>>>>>>>> filter
> >> >>>>>>>>>>>>>>>>>>>> pushdown still is not implemented for
> >> >> jdbc/hive/hbase."
> >> >>>> -> Would
> >> >>>>>>>>>>>>>>> an
> >> >>>>>>>>>>>>>>>>>>>> alternative solution be to actually implement these
> >> >>> filter
> >> >>>>>>>>>>>>>>> pushdowns?
> >> >>>>>>>>>>>>>>>>> I
> >> >>>>>>>>>>>>>>>>>>>> can
> >> >>>>>>>>>>>>>>>>>>>> imagine that there are many more benefits to doing
> >> >> that,
> >> >>>> outside
> >> >>>>>>>>>>>>>>> of
> >> >>>>>>>>>>>>>>>>> lookup
> >> >>>>>>>>>>>>>>>>>>>> caching and metrics.
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>> Martijn Visser
> >> >>>>>>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82
> >> >>>>>>>>>>>>>>>>>>>> https://github.com/MartijnVisser
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
> >> >>>> ro.v.bo...@gmail.com>
> >> >>>>>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>> Hi everyone!
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement!
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>> I do think that single cache implementation would
> be
> >> >> a
> >> >>>> nice
> >> >>>>>>>>>>>>>>>>> opportunity
> >> >>>>>>>>>>>>>>>>>>>> for
> >> >>>>>>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS
> OF
> >> >>>> proc_time"
> >> >>>>>>>>>>>>>>>>> semantics
> >> >>>>>>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be
> implemented.
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say
> that:
> >> >>>>>>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut
> off
> >> >>> the
> >> >>>> cache
> >> >>>>>>>>>>>>>>> size
> >> >>>>>>>>>>>>>>>>> by
> >> >>>>>>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most
> >> handy
> >> >>>> way to do
> >> >>>>>>>>>>>>>>> it
> >> >>>>>>>>>>>>>>>>> is
> >> >>>>>>>>>>>>>>>>>>>> apply
> >> >>>>>>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder
> to
> >> >>>> pass it
> >> >>>>>>>>>>>>>>>>> through the
> >> >>>>>>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander
> >> >>> correctly
> >> >>>>>>>>>>>>>>> mentioned
> >> >>>>>>>>>>>>>>>>> that
> >> >>>>>>>>>>>>>>>>>>>>> filter pushdown still is not implemented for
> >> >>>> jdbc/hive/hbase.
> >> >>>>>>>>>>>>>>>>>>>>> 2) The ability to set the different caching
> >> >> parameters
> >> >>>> for
> >> >>>>>>>>>>>>>>> different
> >> >>>>>>>>>>>>>>>>>>>> tables
> >> >>>>>>>>>>>>>>>>>>>>> is quite important. So I would prefer to set it
> >> >> through
> >> >>>> DDL
> >> >>>>>>>>>>>>>>> rather
> >> >>>>>>>>>>>>>>>>> than
> >> >>>>>>>>>>>>>>>>>>>>> have similar ttla, strategy and other options for
> >> all
> >> >>>> lookup
> >> >>>>>>>>>>>>>>> tables.
> >> >>>>>>>>>>>>>>>>>>>>> 3) Providing the cache into the framework really
> >> >>>> deprives us of
> >> >>>>>>>>>>>>>>>>>>>>> extensibility (users won't be able to implement
> >> their
> >> >>> own
> >> >>>>>>>>>>>>>>> cache).
> >> >>>>>>>>>>>>>>>>> But
> >> >>>>>>>>>>>>>>>>>>>> most
> >> >>>>>>>>>>>>>>>>>>>>> probably it might be solved by creating more
> >> >> different
> >> >>>> cache
> >> >>>>>>>>>>>>>>>>> strategies
> >> >>>>>>>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>>>>>> a wider set of configurations.
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>> All these points are much closer to the schema
> >> >> proposed
> >> >>>> by
> >> >>>>>>>>>>>>>>>>> Alexander.
> >> >>>>>>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right
> and
> >> >>> all
> >> >>>> these
> >> >>>>>>>>>>>>>>>>>>>> facilities
> >> >>>>>>>>>>>>>>>>>>>>> might be simply implemented in your architecture?
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>>>>>> Roman Boyko
> >> >>>>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser <
> >> >>>>>>>>>>>>>>>>> martijnvis...@apache.org>
> >> >>>>>>>>>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to
> >> >>>> express that
> >> >>>>>>>>>>>>>>> I
> >> >>>>>>>>>>>>>>>>> really
> >> >>>>>>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic
> >> >> and I
> >> >>>> hope
> >> >>>>>>>>>>>>>>> that
> >> >>>>>>>>>>>>>>>>>>>> others
> >> >>>>>>>>>>>>>>>>>>>>>> will join the conversation.
> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>> Martijn
> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов <
> >> >>>>>>>>>>>>>>>>> smirale...@gmail.com>
> >> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I
> have
> >> >>>> questions
> >> >>>>>>>>>>>>>>>>> about
> >> >>>>>>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get
> >> >>>> something?).
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR
> >> >>>> SYSTEM_TIME
> >> >>>>>>>>>>>>>>> AS OF
> >> >>>>>>>>>>>>>>>>>>>>>> proc_time”
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME
> AS
> >> >> OF
> >> >>>>>>>>>>>>>>> proc_time"
> >> >>>>>>>>>>>>>>>>> is
> >> >>>>>>>>>>>>>>>>>>>> not
> >> >>>>>>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you said,
> >> >>> users
> >> >>>> go
> >> >>>>>>>>>>>>>>> on it
> >> >>>>>>>>>>>>>>>>>>>>>>> consciously to achieve better performance (no
> one
> >> >>>> proposed
> >> >>>>>>>>>>>>>>> to
> >> >>>>>>>>>>>>>>>>> enable
> >> >>>>>>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you
> mean
> >> >>>> other
> >> >>>>>>>>>>>>>>>>> developers
> >> >>>>>>>>>>>>>>>>>>>> of
> >> >>>>>>>>>>>>>>>>>>>>>>> connectors? In this case developers explicitly
> >> >>> specify
> >> >>>>>>>>>>>>>>> whether
> >> >>>>>>>>>>>>>>>>> their
> >> >>>>>>>>>>>>>>>>>>>>>>> connector supports caching or not (in the list
> of
> >> >>>> supported
> >> >>>>>>>>>>>>>>>>>>>> options),
> >> >>>>>>>>>>>>>>>>>>>>>>> no one makes them do that if they don't want to.
> >> So
> >> >>>> what
> >> >>>>>>>>>>>>>>>>> exactly is
> >> >>>>>>>>>>>>>>>>>>>>>>> the difference between implementing caching in
> >> >>> modules
> >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common
> from
> >> >>> the
> >> >>>>>>>>>>>>>>>>> considered
> >> >>>>>>>>>>>>>>>>>>>>>>> point of view? How does it affect on
> >> >>>> breaking/non-breaking
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> confront a situation that allows table options
> in
> >> >>> DDL
> >> >>>> to
> >> >>>>>>>>>>>>>>>>> control
> >> >>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>> behavior of the framework, which has never
> >> happened
> >> >>>>>>>>>>>>>>> previously
> >> >>>>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>>>>>> should
> >> >>>>>>>>>>>>>>>>>>>>>>> be cautious
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> If we talk about main differences of semantics
> of
> >> >> DDL
> >> >>>>>>>>>>>>>>> options
> >> >>>>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about
> >> >>>> limiting
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>> scope
> >> >>>>>>>>>>>>>>>>>>>> of
> >> >>>>>>>>>>>>>>>>>>>>>>> the options + importance for the user business
> >> >> logic
> >> >>>> rather
> >> >>>>>>>>>>>>>>> than
> >> >>>>>>>>>>>>>>>>>>>>>>> specific location of corresponding logic in the
> >> >>>> framework? I
> >> >>>>>>>>>>>>>>>>> mean
> >> >>>>>>>>>>>>>>>>>>>> that
> >> >>>>>>>>>>>>>>>>>>>>>>> in my design, for example, putting an option
> with
> >> >>>> lookup
> >> >>>>>>>>>>>>>>> cache
> >> >>>>>>>>>>>>>>>>>>>>>>> strategy in configurations would  be the wrong
> >> >>>> decision,
> >> >>>>>>>>>>>>>>>>> because it
> >> >>>>>>>>>>>>>>>>>>>>>>> directly affects the user's business logic (not
> >> >> just
> >> >>>>>>>>>>>>>>> performance
> >> >>>>>>>>>>>>>>>>>>>>>>> optimization) + touches just several functions
> of
> >> >> ONE
> >> >>>> table
> >> >>>>>>>>>>>>>>>>> (there
> >> >>>>>>>>>>>>>>>>>>>> can
> >> >>>>>>>>>>>>>>>>>>>>>>> be multiple tables with different caches). Does
> it
> >> >>>> really
> >> >>>>>>>>>>>>>>>>> matter for
> >> >>>>>>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is
> >> >>> located,
> >> >>>>>>>>>>>>>>> which is
> >> >>>>>>>>>>>>>>>>>>>>>>> affected by the applied option?
> >> >>>>>>>>>>>>>>>>>>>>>>> Also I can remember DDL option
> 'sink.parallelism',
> >> >>>> which in
> >> >>>>>>>>>>>>>>>>> some way
> >> >>>>>>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I
> >> >> don't
> >> >>>> see any
> >> >>>>>>>>>>>>>>>>> problem
> >> >>>>>>>>>>>>>>>>>>>>>>> here.
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching
> >> >>>> scenario
> >> >>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>> design
> >> >>>>>>>>>>>>>>>>>>>>>>> would become more complex
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but
> >> >>>> actually
> >> >>>>>>>>>>>>>>> in our
> >> >>>>>>>>>>>>>>>>>>>>>>> internal version we solved this problem quite
> >> >> easily
> >> >>> -
> >> >>>> we
> >> >>>>>>>>>>>>>>> reused
> >> >>>>>>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new
> >> >>> API).
> >> >>>> The
> >> >>>>>>>>>>>>>>>>> point is
> >> >>>>>>>>>>>>>>>>>>>>>>> that currently all lookup connectors use
> >> >> InputFormat
> >> >>>> for
> >> >>>>>>>>>>>>>>>>> scanning
> >> >>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive -
> it
> >> >>> uses
> >> >>>>>>>>>>>>>>> class
> >> >>>>>>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper
> >> >>> around
> >> >>>>>>>>>>>>>>>>> InputFormat.
> >> >>>>>>>>>>>>>>>>>>>>>>> The advantage of this solution is the ability to
> >> >>> reload
> >> >>>>>>>>>>>>>>> cache
> >> >>>>>>>>>>>>>>>>> data
> >> >>>>>>>>>>>>>>>>>>>> in
> >> >>>>>>>>>>>>>>>>>>>>>>> parallel (number of threads depends on number of
> >> >>>>>>>>>>>>>>> InputSplits,
> >> >>>>>>>>>>>>>>>>> but
> >> >>>>>>>>>>>>>>>>>>>> has
> >> >>>>>>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time
> >> >>>> significantly
> >> >>>>>>>>>>>>>>>>> reduces
> >> >>>>>>>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I
> know
> >> >>> that
> >> >>>>>>>>>>>>>>> usually
> >> >>>>>>>>>>>>>>>>> we
> >> >>>>>>>>>>>>>>>>>>>> try
> >> >>>>>>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but
> >> >>> maybe
> >> >>>> this
> >> >>>>>>>>>>>>>>> one
> >> >>>>>>>>>>>>>>>>> can
> >> >>>>>>>>>>>>>>>>>>>> be
> >> >>>>>>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal
> >> >>>> solution,
> >> >>>>>>>>>>>>>>> maybe
> >> >>>>>>>>>>>>>>>>>>>> there
> >> >>>>>>>>>>>>>>>>>>>>>>> are better ones.
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might
> >> >> introduce
> >> >>>>>>>>>>>>>>>>> compatibility
> >> >>>>>>>>>>>>>>>>>>>>>> issues
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> It's possible only in cases when the developer
> of
> >> >> the
> >> >>>>>>>>>>>>>>> connector
> >> >>>>>>>>>>>>>>>>>>>> won't
> >> >>>>>>>>>>>>>>>>>>>>>>> properly refactor his code and will use new
> cache
> >> >>>> options
> >> >>>>>>>>>>>>>>>>>>>> incorrectly
> >> >>>>>>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2
> >> >>>> different
> >> >>>>>>>>>>>>>>> code
> >> >>>>>>>>>>>>>>>>>>>>>>> places). For correct behavior all he will need
> to
> >> >> do
> >> >>>> is to
> >> >>>>>>>>>>>>>>>>> redirect
> >> >>>>>>>>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig
> >> (+
> >> >>>> maybe
> >> >>>>>>>>>>>>>>> add an
> >> >>>>>>>>>>>>>>>>>>>> alias
> >> >>>>>>>>>>>>>>>>>>>>>>> for options, if there was different naming),
> >> >>> everything
> >> >>>>>>>>>>>>>>> will be
> >> >>>>>>>>>>>>>>>>>>>>>>> transparent for users. If the developer won't do
> >> >>>>>>>>>>>>>>> refactoring at
> >> >>>>>>>>>>>>>>>>> all,
> >> >>>>>>>>>>>>>>>>>>>>>>> nothing will be changed for the connector
> because
> >> >> of
> >> >>>>>>>>>>>>>>> backward
> >> >>>>>>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use
> >> his
> >> >>> own
> >> >>>>>>>>>>>>>>> cache
> >> >>>>>>>>>>>>>>>>> logic,
> >> >>>>>>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs
> >> into
> >> >>> the
> >> >>>>>>>>>>>>>>>>> framework,
> >> >>>>>>>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>>>>>>>> instead make his own implementation with already
> >> >>>> existing
> >> >>>>>>>>>>>>>>>>> configs
> >> >>>>>>>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare
> >> >> case).
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> filters and projections should be pushed all
> the
> >> >> way
> >> >>>> down
> >> >>>>>>>>>>>>>>> to
> >> >>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>> table
> >> >>>>>>>>>>>>>>>>>>>>>>> function, like what we do in the scan source
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that
> the
> >> >>> ONLY
> >> >>>>>>>>>>>>>>> connector
> >> >>>>>>>>>>>>>>>>>>>> that
> >> >>>>>>>>>>>>>>>>>>>>>>> supports filter pushdown is
> FileSystemTableSource
> >> >>>>>>>>>>>>>>>>>>>>>>> (no database connector supports it currently).
> >> Also
> >> >>>> for some
> >> >>>>>>>>>>>>>>>>>>>> databases
> >> >>>>>>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex
> >> >>> filters
> >> >>>>>>>>>>>>>>> that we
> >> >>>>>>>>>>>>>>>>> have
> >> >>>>>>>>>>>>>>>>>>>>>>> in Flink.
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> only applying these optimizations to the cache
> >> >> seems
> >> >>>> not
> >> >>>>>>>>>>>>>>>>> quite
> >> >>>>>>>>>>>>>>>>>>>>> useful
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount
> of
> >> >>> data
> >> >>>>>>>>>>>>>>> from the
> >> >>>>>>>>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose
> in
> >> >>>> dimension
> >> >>>>>>>>>>>>>>>>> table
> >> >>>>>>>>>>>>>>>>>>>>>>> 'users'
> >> >>>>>>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40,
> >> and
> >> >>>> input
> >> >>>>>>>>>>>>>>> stream
> >> >>>>>>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age
> of
> >> >>>> users. If
> >> >>>>>>>>>>>>>>> we
> >> >>>>>>>>>>>>>>>>> have
> >> >>>>>>>>>>>>>>>>>>>>>>> filter 'age > 30',
> >> >>>>>>>>>>>>>>>>>>>>>>> there will be twice less data in cache. This
> means
> >> >>> the
> >> >>>> user
> >> >>>>>>>>>>>>>>> can
> >> >>>>>>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2
> >> times.
> >> >>> It
> >> >>>> will
> >> >>>>>>>>>>>>>>>>> gain a
> >> >>>>>>>>>>>>>>>>>>>>>>> huge
> >> >>>>>>>>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization
> >> >> starts
> >> >>>> to
> >> >>>>>>>>>>>>>>> really
> >> >>>>>>>>>>>>>>>>>>>> shine
> >> >>>>>>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and
> >> >>>> projections
> >> >>>>>>>>>>>>>>>>> can't
> >> >>>>>>>>>>>>>>>>>>>> fit
> >> >>>>>>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up
> >> >>>> additional
> >> >>>>>>>>>>>>>>>>>>>> possibilities
> >> >>>>>>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite
> >> >>>> useful'.
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> It would be great to hear other voices regarding
> >> >> this
> >> >>>> topic!
> >> >>>>>>>>>>>>>>>>> Because
> >> >>>>>>>>>>>>>>>>>>>>>>> we have quite a lot of controversial points,
> and I
> >> >>>> think
> >> >>>>>>>>>>>>>>> with
> >> >>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>> help
> >> >>>>>>>>>>>>>>>>>>>>>>> of others it will be easier for us to come to a
> >> >>>> consensus.
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
> >> >>>>>>>>>>>>>>> renqs...@gmail.com
> >> >>>>>>>>>>>>>>>>>> :
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid,
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late
> >> >>>> response!
> >> >>>>>>>>>>>>>>> We
> >> >>>>>>>>>>>>>>>>> had
> >> >>>>>>>>>>>>>>>>>>>> an
> >> >>>>>>>>>>>>>>>>>>>>>>> internal discussion together with Jark and
> Leonard
> >> >>> and
> >> >>>> I’d
> >> >>>>>>>>>>>>>>> like
> >> >>>>>>>>>>>>>>>>> to
> >> >>>>>>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the
> >> >>> cache
> >> >>>>>>>>>>>>>>> logic in
> >> >>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>> table
> >> >>>>>>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the
> user-provided
> >> >>>> table
> >> >>>>>>>>>>>>>>>>> function,
> >> >>>>>>>>>>>>>>>>>>>> we
> >> >>>>>>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending
> >> >>>> TableFunction
> >> >>>>>>>>>>>>>>> with
> >> >>>>>>>>>>>>>>>>> these
> >> >>>>>>>>>>>>>>>>>>>>>>> concerns:
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR
> >> >>>>>>>>>>>>>>> SYSTEM_TIME
> >> >>>>>>>>>>>>>>>>> AS OF
> >> >>>>>>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect
> the
> >> >>>> content
> >> >>>>>>>>>>>>>>> of the
> >> >>>>>>>>>>>>>>>>>>>> lookup
> >> >>>>>>>>>>>>>>>>>>>>>>> table at the moment of querying. If users choose
> >> to
> >> >>>> enable
> >> >>>>>>>>>>>>>>>>> caching
> >> >>>>>>>>>>>>>>>>>>>> on
> >> >>>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this
> >> >>>> breakage is
> >> >>>>>>>>>>>>>>>>>>>> acceptable
> >> >>>>>>>>>>>>>>>>>>>>>> in
> >> >>>>>>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not
> to
> >> >>>> provide
> >> >>>>>>>>>>>>>>>>> caching on
> >> >>>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>> table runtime level.
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the
> >> >>>> framework
> >> >>>>>>>>>>>>>>>>> (whether
> >> >>>>>>>>>>>>>>>>>>>> in a
> >> >>>>>>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we
> have
> >> >> to
> >> >>>>>>>>>>>>>>> confront a
> >> >>>>>>>>>>>>>>>>>>>>>> situation
> >> >>>>>>>>>>>>>>>>>>>>>>> that allows table options in DDL to control the
> >> >>>> behavior of
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>> framework,
> >> >>>>>>>>>>>>>>>>>>>>>>> which has never happened previously and should
> be
> >> >>>> cautious.
> >> >>>>>>>>>>>>>>>>> Under
> >> >>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>> current design the behavior of the framework
> >> should
> >> >>>> only be
> >> >>>>>>>>>>>>>>>>>>>> specified
> >> >>>>>>>>>>>>>>>>>>>>> by
> >> >>>>>>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard
> >> to
> >> >>>> apply
> >> >>>>>>>>>>>>>>> these
> >> >>>>>>>>>>>>>>>>>>>> general
> >> >>>>>>>>>>>>>>>>>>>>>>> configs to a specific table.
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads
> and
> >> >>>> refresh
> >> >>>>>>>>>>>>>>> all
> >> >>>>>>>>>>>>>>>>>>>> records
> >> >>>>>>>>>>>>>>>>>>>>>>> periodically into the memory to achieve high
> >> lookup
> >> >>>>>>>>>>>>>>> performance
> >> >>>>>>>>>>>>>>>>>>>> (like
> >> >>>>>>>>>>>>>>>>>>>>>> Hive
> >> >>>>>>>>>>>>>>>>>>>>>>> connector in the community, and also widely used
> >> by
> >> >>> our
> >> >>>>>>>>>>>>>>> internal
> >> >>>>>>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the
> user’s
> >> >>>>>>>>>>>>>>> TableFunction
> >> >>>>>>>>>>>>>>>>>>>> works
> >> >>>>>>>>>>>>>>>>>>>>>> fine
> >> >>>>>>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to
> introduce a
> >> >>> new
> >> >>>>>>>>>>>>>>>>> interface for
> >> >>>>>>>>>>>>>>>>>>>>> this
> >> >>>>>>>>>>>>>>>>>>>>>>> all-caching scenario and the design would become
> >> >> more
> >> >>>>>>>>>>>>>>> complex.
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might
> >> >>>> introduce
> >> >>>>>>>>>>>>>>>>>>>> compatibility
> >> >>>>>>>>>>>>>>>>>>>>>>> issues to existing lookup sources like there
> might
> >> >>>> exist two
> >> >>>>>>>>>>>>>>>>> caches
> >> >>>>>>>>>>>>>>>>>>>>> with
> >> >>>>>>>>>>>>>>>>>>>>>>> totally different strategies if the user
> >> >> incorrectly
> >> >>>>>>>>>>>>>>> configures
> >> >>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>> table
> >> >>>>>>>>>>>>>>>>>>>>>>> (one in the framework and another implemented by
> >> >> the
> >> >>>> lookup
> >> >>>>>>>>>>>>>>>>> source).
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by
> Alexander, I
> >> >>>> think
> >> >>>>>>>>>>>>>>>>> filters
> >> >>>>>>>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>>>>>>>> projections should be pushed all the way down to
> >> >> the
> >> >>>> table
> >> >>>>>>>>>>>>>>>>> function,
> >> >>>>>>>>>>>>>>>>>>>>> like
> >> >>>>>>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the
> >> >> runner
> >> >>>> with
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>> cache.
> >> >>>>>>>>>>>>>>>>>>>>> The
> >> >>>>>>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O
> >> >> and
> >> >>>>>>>>>>>>>>> pressure
> >> >>>>>>>>>>>>>>>>> on the
> >> >>>>>>>>>>>>>>>>>>>>>>> external system, and only applying these
> >> >>> optimizations
> >> >>>> to
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>> cache
> >> >>>>>>>>>>>>>>>>>>>>> seems
> >> >>>>>>>>>>>>>>>>>>>>>>> not quite useful.
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect
> our
> >> >>>> ideas.
> >> >>>>>>>>>>>>>>> We
> >> >>>>>>>>>>>>>>>>>>>> prefer to
> >> >>>>>>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of
> >> >>>> TableFunction,
> >> >>>>>>>>>>>>>>> and we
> >> >>>>>>>>>>>>>>>>>>>> could
> >> >>>>>>>>>>>>>>>>>>>>>>> provide some helper classes
> (CachingTableFunction,
> >> >>>>>>>>>>>>>>>>>>>>>> AllCachingTableFunction,
> >> >>>>>>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and
> >> >> regulate
> >> >>>>>>>>>>>>>>> metrics
> >> >>>>>>>>>>>>>>>>> of the
> >> >>>>>>>>>>>>>>>>>>>>>> cache.
> >> >>>>>>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference.
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your ideas!
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> [1]
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> >>>>>>>>>>>>>>>>>>>>>>>> [2]
> >> >>> https://github.com/PatrickRen/flink/tree/FLIP-221
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр
> Смирнов
> >> >> <
> >> >>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> >> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid!
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have few comments on your message.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> but could also live with an easier solution
> as
> >> >> the
> >> >>>>>>>>>>>>>>> first
> >> >>>>>>>>>>>>>>>>> step:
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually
> exclusive
> >> >>>>>>>>>>>>>>> (originally
> >> >>>>>>>>>>>>>>>>>>>>> proposed
> >> >>>>>>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually
> >> they
> >> >>>> follow
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>> same
> >> >>>>>>>>>>>>>>>>>>>>>>>>> goal, but implementation details are
> different.
> >> >> If
> >> >>> we
> >> >>>>>>>>>>>>>>> will
> >> >>>>>>>>>>>>>>>>> go one
> >> >>>>>>>>>>>>>>>>>>>>> way,
> >> >>>>>>>>>>>>>>>>>>>>>>>>> moving to another way in the future will mean
> >> >>>> deleting
> >> >>>>>>>>>>>>>>>>> existing
> >> >>>>>>>>>>>>>>>>>>>> code
> >> >>>>>>>>>>>>>>>>>>>>>>>>> and once again changing the API for
> connectors.
> >> >> So
> >> >>> I
> >> >>>>>>>>>>>>>>> think we
> >> >>>>>>>>>>>>>>>>>>>> should
> >> >>>>>>>>>>>>>>>>>>>>>>>>> reach a consensus with the community about
> that
> >> >> and
> >> >>>> then
> >> >>>>>>>>>>>>>>> work
> >> >>>>>>>>>>>>>>>>>>>>> together
> >> >>>>>>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks
> for
> >> >>>> different
> >> >>>>>>>>>>>>>>>>> parts
> >> >>>>>>>>>>>>>>>>>>>> of
> >> >>>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification /
> >> >>>> introducing
> >> >>>>>>>>>>>>>>>>> proposed
> >> >>>>>>>>>>>>>>>>>>>> set
> >> >>>>>>>>>>>>>>>>>>>>> of
> >> >>>>>>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng?
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> as the source will only receive the requests
> >> >> after
> >> >>>>>>>>>>>>>>> filter
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of
> the
> >> >>>> lookup
> >> >>>>>>>>>>>>>>>>> table, we
> >> >>>>>>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that
> we
> >> >>> can
> >> >>>>>>>>>>>>>>> filter
> >> >>>>>>>>>>>>>>>>>>>>> responses,
> >> >>>>>>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter
> >> >>>> pushdown. So
> >> >>>>>>>>>>>>>>> if
> >> >>>>>>>>>>>>>>>>>>>>> filtering
> >> >>>>>>>>>>>>>>>>>>>>>>>>> is done before caching, there will be much
> less
> >> >>> rows
> >> >>>> in
> >> >>>>>>>>>>>>>>>>> cache.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture
> is
> >> >> not
> >> >>>>>>>>>>>>>>> shared.
> >> >>>>>>>>>>>>>>>>> I
> >> >>>>>>>>>>>>>>>>>>>> don't
> >> >>>>>>>>>>>>>>>>>>>>>>> know the
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of
> >> >>>>>>>>>>>>>>> conversations
> >> >>>>>>>>>>>>>>>>> :)
> >> >>>>>>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I
> >> >>> made a
> >> >>>>>>>>>>>>>>> Jira
> >> >>>>>>>>>>>>>>>>> issue,
> >> >>>>>>>>>>>>>>>>>>>>>>>>> where described the proposed changes in more
> >> >>> details
> >> >>>> -
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >> https://issues.apache.org/jira/browse/FLINK-27411.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback!
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >> >>>>>>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> >> >>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
> >> >>>>>>>>>>>>>>> ar...@apache.org>:
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng,
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency
> was
> >> >> not
> >> >>>>>>>>>>>>>>>>> satisfying
> >> >>>>>>>>>>>>>>>>>>>> for
> >> >>>>>>>>>>>>>>>>>>>>>> me.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could
> also
> >> >>> live
> >> >>>>>>>>>>>>>>> with
> >> >>>>>>>>>>>>>>>>> an
> >> >>>>>>>>>>>>>>>>>>>>> easier
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making
> >> >>>> caching
> >> >>>>>>>>>>>>>>> an
> >> >>>>>>>>>>>>>>>>>>>>>>> implementation
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a
> >> >> caching
> >> >>>>>>>>>>>>>>> layer
> >> >>>>>>>>>>>>>>>>>>>> around X.
> >> >>>>>>>>>>>>>>>>>>>>>> So
> >> >>>>>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that
> >> >>>>>>>>>>>>>>> delegates to
> >> >>>>>>>>>>>>>>>>> X in
> >> >>>>>>>>>>>>>>>>>>>>> case
> >> >>>>>>>>>>>>>>>>>>>>>>> of
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it
> >> >> into
> >> >>>> the
> >> >>>>>>>>>>>>>>>>> operator
> >> >>>>>>>>>>>>>>>>>>>>>> model
> >> >>>>>>>>>>>>>>>>>>>>>>> as
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably
> >> >>>>>>>>>>>>>>> unnecessary
> >> >>>>>>>>>>>>>>>>> in
> >> >>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>> first step
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only
> >> >>> receive
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>> requests
> >> >>>>>>>>>>>>>>>>>>>>>>> after
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more
> >> >>> interesting
> >> >>>> to
> >> >>>>>>>>>>>>>>> save
> >> >>>>>>>>>>>>>>>>>>>>> memory).
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of
> >> >> this
> >> >>>> FLIP
> >> >>>>>>>>>>>>>>>>> would be
> >> >>>>>>>>>>>>>>>>>>>>>>> limited to
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces.
> >> >>>> Everything
> >> >>>>>>>>>>>>>>> else
> >> >>>>>>>>>>>>>>>>>>>>> remains
> >> >>>>>>>>>>>>>>>>>>>>>> an
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means
> we
> >> >> can
> >> >>>>>>>>>>>>>>> easily
> >> >>>>>>>>>>>>>>>>>>>>>> incorporate
> >> >>>>>>>>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed
> >> >> out
> >> >>>>>>>>>>>>>>> later.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture
> is
> >> >> not
> >> >>>>>>>>>>>>>>> shared.
> >> >>>>>>>>>>>>>>>>> I
> >> >>>>>>>>>>>>>>>>>>>> don't
> >> >>>>>>>>>>>>>>>>>>>>>>> know the
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр
> >> >> Смирнов
> >> >>> <
> >> >>>>>>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not
> a
> >> >>>>>>>>>>>>>>> committer
> >> >>>>>>>>>>>>>>>>> yet,
> >> >>>>>>>>>>>>>>>>>>>> but
> >> >>>>>>>>>>>>>>>>>>>>>> I'd
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP
> >> really
> >> >>>>>>>>>>>>>>>>> interested
> >> >>>>>>>>>>>>>>>>>>>> me.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature
> in
> >> >> my
> >> >>>>>>>>>>>>>>>>> company’s
> >> >>>>>>>>>>>>>>>>>>>>> Flink
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our
> thoughts
> >> >> on
> >> >>>>>>>>>>>>>>> this and
> >> >>>>>>>>>>>>>>>>>>>> make
> >> >>>>>>>>>>>>>>>>>>>>>> code
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> open source.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than
> >> >>>>>>>>>>>>>>> introducing an
> >> >>>>>>>>>>>>>>>>>>>>> abstract
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> class for TableFunction
> >> (CachingTableFunction).
> >> >>> As
> >> >>>>>>>>>>>>>>> you
> >> >>>>>>>>>>>>>>>>> know,
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the
> flink-table-common
> >> >>>>>>>>>>>>>>> module,
> >> >>>>>>>>>>>>>>>>> which
> >> >>>>>>>>>>>>>>>>>>>>>>> provides
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s
> >> very
> >> >>>>>>>>>>>>>>>>> convenient
> >> >>>>>>>>>>>>>>>>>>>> for
> >> >>>>>>>>>>>>>>>>>>>>>>> importing
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction
> >> >>>> contains
> >> >>>>>>>>>>>>>>>>> logic
> >> >>>>>>>>>>>>>>>>>>>> for
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> runtime execution,  so this class and
> >> >> everything
> >> >>>>>>>>>>>>>>>>> connected
> >> >>>>>>>>>>>>>>>>>>>> with
> >> >>>>>>>>>>>>>>>>>>>>> it
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> should be located in another module,
> probably
> >> >> in
> >> >>>>>>>>>>>>>>>>>>>>>>> flink-table-runtime.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend
> on
> >> >>>> another
> >> >>>>>>>>>>>>>>>>> module,
> >> >>>>>>>>>>>>>>>>>>>>>> which
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which
> doesn’t
> >> >>>> sound
> >> >>>>>>>>>>>>>>>>> good.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method
> >> ‘getLookupConfig’
> >> >>> to
> >> >>>>>>>>>>>>>>>>>>>>>> LookupTableSource
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors
> >> to
> >> >>>> only
> >> >>>>>>>>>>>>>>> pass
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore
> they
> >> >>> won’t
> >> >>>>>>>>>>>>>>>>> depend on
> >> >>>>>>>>>>>>>>>>>>>>>>> runtime
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner
> >> >> will
> >> >>>>>>>>>>>>>>>>> construct a
> >> >>>>>>>>>>>>>>>>>>>>>> lookup
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime
> logic
> >> >>>>>>>>>>>>>>>>>>>> (ProcessFunctions
> >> >>>>>>>>>>>>>>>>>>>>>> in
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture
> >> looks
> >> >>>> like
> >> >>>>>>>>>>>>>>> in
> >> >>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>> pinned
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually
> >> >> yours
> >> >>>>>>>>>>>>>>>>>>>> CacheConfig).
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be
> >> >>>>>>>>>>>>>>> responsible
> >> >>>>>>>>>>>>>>>>> for
> >> >>>>>>>>>>>>>>>>>>>>> this
> >> >>>>>>>>>>>>>>>>>>>>>> –
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in
> >> >>>>>>>>>>>>>>> flink-table-runtime
> >> >>>>>>>>>>>>>>>>> -
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner,
> >> >>>>>>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc,
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes
> >> >> LookupJoinCachingRunner,
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful
> advantage
> >> >> of
> >> >>>>>>>>>>>>>>> such a
> >> >>>>>>>>>>>>>>>>>>>>> solution.
> >> >>>>>>>>>>>>>>>>>>>>>>> If
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we
> can
> >> >>>> apply
> >> >>>>>>>>>>>>>>> some
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> optimizations to it.
> LookupJoinRunnerWithCalc
> >> >> was
> >> >>>>>>>>>>>>>>> named
> >> >>>>>>>>>>>>>>>>> like
> >> >>>>>>>>>>>>>>>>>>>>> this
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which
> >> >>> actually
> >> >>>>>>>>>>>>>>>>> mostly
> >> >>>>>>>>>>>>>>>>>>>>>> consists
> >> >>>>>>>>>>>>>>>>>>>>>>> of
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> filters and projections.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup
> table
> >> >> B
> >> >>>>>>>>>>>>>>>>> condition
> >> >>>>>>>>>>>>>>>>>>>>> ‘JOIN …
> >> >>>>>>>>>>>>>>>>>>>>>>> ON
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE
> >> >>> B.salary >
> >> >>>>>>>>>>>>>>> 1000’
> >> >>>>>>>>>>>>>>>>>>>>> ‘calc’
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age
> +
> >> >> 10
> >> >>>> and
> >> >>>>>>>>>>>>>>>>>>>> B.salary >
> >> >>>>>>>>>>>>>>>>>>>>>>> 1000.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing
> >> >> records
> >> >>> in
> >> >>>>>>>>>>>>>>>>> cache,
> >> >>>>>>>>>>>>>>>>>>>> size
> >> >>>>>>>>>>>>>>>>>>>>> of
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced:
> filters =
> >> >>>> avoid
> >> >>>>>>>>>>>>>>>>> storing
> >> >>>>>>>>>>>>>>>>>>>>>> useless
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce
> >> records’
> >> >>>>>>>>>>>>>>> size. So
> >> >>>>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>>>>>>> initial
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be
> >> increased
> >> >>> by
> >> >>>>>>>>>>>>>>> the
> >> >>>>>>>>>>>>>>>>> user.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> What do you think about it?
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi devs,
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion
> >> >>> about
> >> >>>>>>>>>>>>>>>>>>>> FLIP-221[1],
> >> >>>>>>>>>>>>>>>>>>>>>>> which
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table
> >> cache
> >> >>> and
> >> >>>>>>>>>>>>>>> its
> >> >>>>>>>>>>>>>>>>>>>> standard
> >> >>>>>>>>>>>>>>>>>>>>>>> metrics.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should
> >> >>>> implement
> >> >>>>>>>>>>>>>>>>> their
> >> >>>>>>>>>>>>>>>>>>>> own
> >> >>>>>>>>>>>>>>>>>>>>>>> cache to
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a
> >> >> standard
> >> >>> of
> >> >>>>>>>>>>>>>>>>> metrics
> >> >>>>>>>>>>>>>>>>>>>> for
> >> >>>>>>>>>>>>>>>>>>>>>>> users and
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup
> >> >>> joins,
> >> >>>>>>>>>>>>>>> which
> >> >>>>>>>>>>>>>>>>> is a
> >> >>>>>>>>>>>>>>>>>>>>>> quite
> >> >>>>>>>>>>>>>>>>>>>>>>> common
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs
> including
> >> >>>> cache,
> >> >>>>>>>>>>>>>>>>>>>> metrics,
> >> >>>>>>>>>>>>>>>>>>>>>>> wrapper
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table
> >> options.
> >> >>>>>>>>>>>>>>> Please
> >> >>>>>>>>>>>>>>>>> take a
> >> >>>>>>>>>>>>>>>>>>>>> look
> >> >>>>>>>>>>>>>>>>>>>>>>> at the
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any
> >> >>> suggestions
> >> >>>>>>>>>>>>>>> and
> >> >>>>>>>>>>>>>>>>>>>> comments
> >> >>>>>>>>>>>>>>>>>>>>>>> would be
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>> appreciated!
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> --
> >> >>>>>>>>>>>>>>>>>>>>>>>> Best Regards,
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng Ren
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Real-time Computing Team
> >> >>>>>>>>>>>>>>>>>>>>>>>> Alibaba Cloud
> >> >>>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
> >> >>>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>>> --
> >> >>>>>>>>>>>>>>>>>>> Best regards,
> >> >>>>>>>>>>>>>>>>>>> Roman Boyko
> >> >>>>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
> >> >>>>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>>>
> >> >>>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> --
> >> >>>>>>>>>>>> Best Regards,
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Qingsheng Ren
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Real-time Computing Team
> >> >>>>>>>>>>>> Alibaba Cloud
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Email: renqs...@gmail.com
> >> >>>>>>>>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>
> >>
> >>
>

Reply via email to