Hi Jark,

Thanks for your reply!

Currently 'lookup.async' just lies in HBase connector, I have no idea
whether or when to remove it (we can discuss it in another issue for the
HBase connector after FLINK-27625 is done), just not add it into a common
option now.

Best,
Lincoln Lee


Jark Wu <imj...@gmail.com> 于2022年5月24日周二 20:14写道:

> Hi Lincoln,
>
> I have taken a look at FLIP-234, and I agree with you that the connectors
> can
> provide both async and sync runtime providers simultaneously instead of one
> of them.
> At that point, "lookup.async" looks redundant. If this option is planned to
> be removed
> in the long term, I think it makes sense not to introduce it in this FLIP.
>
> Best,
> Jark
>
> On Tue, 24 May 2022 at 11:08, Lincoln Lee <lincoln.8...@gmail.com> wrote:
>
> > Hi Qingsheng,
> >
> > Sorry for jumping into the discussion so late. It's a good idea that we
> can
> > have a common table option. I have a minor comments on  'lookup.async'
> that
> > not make it a common option:
> >
> > The table layer abstracts both sync and async lookup capabilities,
> > connectors implementers can choose one or both, in the case of
> implementing
> > only one capability(status of the most of existing builtin connectors)
> > 'lookup.async' will not be used.  And when a connector has both
> > capabilities, I think this choice is more suitable for making decisions
> at
> > the query level, for example, table planner can choose the physical
> > implementation of async lookup or sync lookup based on its cost model, or
> > users can give query hint based on their own better understanding.  If
> > there is another common table option 'lookup.async', it may confuse the
> > users in the long run.
> >
> > So, I prefer to leave the 'lookup.async' option in private place (for the
> > current hbase connector) and not turn it into a common option.
> >
> > WDYT?
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Qingsheng Ren <renqs...@gmail.com> 于2022年5月23日周一 14:54写道:
> >
> > > Hi Alexander,
> > >
> > > Thanks for the review! We recently updated the FLIP and you can find
> > those
> > > changes from my latest email. Since some terminologies has changed so
> > I’ll
> > > use the new concept for replying your comments.
> > >
> > > 1. Builder vs ‘of’
> > > I’m OK to use builder pattern if we have additional optional parameters
> > > for full caching mode (“rescan” previously). The schedule-with-delay
> idea
> > > looks reasonable to me, but I think we need to redesign the builder API
> > of
> > > full caching to make it more descriptive for developers. Would you mind
> > > sharing your ideas about the API? For accessing the FLIP workspace you
> > can
> > > just provide your account ID and ping any PMC member including Jark.
> > >
> > > 2. Common table options
> > > We have some discussions these days and propose to introduce 8 common
> > > table options about caching. It has been updated on the FLIP.
> > >
> > > 3. Retries
> > > I think we are on the same page :-)
> > >
> > > For your additional concerns:
> > > 1) The table option has been updated.
> > > 2) We got “lookup.cache” back for configuring whether to use partial or
> > > full caching mode.
> > >
> > > Best regards,
> > >
> > > Qingsheng
> > >
> > >
> > >
> > > > On May 19, 2022, at 17:25, Александр Смирнов <smirale...@gmail.com>
> > > wrote:
> > > >
> > > > Also I have a few additions:
> > > > 1) maybe rename 'lookup.cache.maximum-size' to
> > > > 'lookup.cache.max-rows'? I think it will be more clear that we talk
> > > > not about bytes, but about the number of rows. Plus it fits more,
> > > > considering my optimization with filters.
> > > > 2) How will users enable rescanning? Are we going to separate caching
> > > > and rescanning from the options point of view? Like initially we had
> > > > one option 'lookup.cache' with values LRU / ALL. I think now we can
> > > > make a boolean option 'lookup.rescan'. RescanInterval can be
> > > > 'lookup.rescan.interval', etc.
> > > >
> > > > Best regards,
> > > > Alexander
> > > >
> > > > чт, 19 мая 2022 г. в 14:50, Александр Смирнов <smirale...@gmail.com
> >:
> > > >>
> > > >> Hi Qingsheng and Jark,
> > > >>
> > > >> 1. Builders vs 'of'
> > > >> I understand that builders are used when we have multiple
> parameters.
> > > >> I suggested them because we could add parameters later. To prevent
> > > >> Builder for ScanRuntimeProvider from looking redundant I can suggest
> > > >> one more config now - "rescanStartTime".
> > > >> It's a time in UTC (LocalTime class) when the first reload of cache
> > > >> starts. This parameter can be thought of as 'initialDelay' (diff
> > > >> between current time and rescanStartTime) in method
> > > >> ScheduleExecutorService#scheduleWithFixedDelay [1] . It can be very
> > > >> useful when the dimension table is updated by some other scheduled
> job
> > > >> at a certain time. Or when the user simply wants a second scan
> (first
> > > >> cache reload) be delayed. This option can be used even without
> > > >> 'rescanInterval' - in this case 'rescanInterval' will be one day.
> > > >> If you are fine with this option, I would be very glad if you would
> > > >> give me access to edit FLIP page, so I could add it myself
> > > >>
> > > >> 2. Common table options
> > > >> I also think that FactoryUtil would be overloaded by all cache
> > > >> options. But maybe unify all suggested options, not only for default
> > > >> cache? I.e. class 'LookupOptions', that unifies default cache
> options,
> > > >> rescan options, 'async', 'maxRetries'. WDYT?
> > > >>
> > > >> 3. Retries
> > > >> I'm fine with suggestion close to RetryUtils#tryTimes(times, call)
> > > >>
> > > >> [1]
> > >
> >
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-
> > > >>
> > > >> Best regards,
> > > >> Alexander
> > > >>
> > > >> ср, 18 мая 2022 г. в 16:04, Qingsheng Ren <renqs...@gmail.com>:
> > > >>>
> > > >>> Hi Jark and Alexander,
> > > >>>
> > > >>> Thanks for your comments! I’m also OK to introduce common table
> > > options. I prefer to introduce a new DefaultLookupCacheOptions class
> for
> > > holding these option definitions because putting all options into
> > > FactoryUtil would make it a bit ”crowded” and not well categorized.
> > > >>>
> > > >>> FLIP has been updated according to suggestions above:
> > > >>> 1. Use static “of” method for constructing RescanRuntimeProvider
> > > considering both arguments are required.
> > > >>> 2. Introduce new table options matching DefaultLookupCacheFactory
> > > >>>
> > > >>> Best,
> > > >>> Qingsheng
> > > >>>
> > > >>> On Wed, May 18, 2022 at 2:57 PM Jark Wu <imj...@gmail.com> wrote:
> > > >>>>
> > > >>>> Hi Alex,
> > > >>>>
> > > >>>> 1) retry logic
> > > >>>> I think we can extract some common retry logic into utilities,
> e.g.
> > > RetryUtils#tryTimes(times, call).
> > > >>>> This seems independent of this FLIP and can be reused by
> DataStream
> > > users.
> > > >>>> Maybe we can open an issue to discuss this and where to put it.
> > > >>>>
> > > >>>> 2) cache ConfigOptions
> > > >>>> I'm fine with defining cache config options in the framework.
> > > >>>> A candidate place to put is FactoryUtil which also includes
> > > "sink.parallelism", "format" options.
> > > >>>>
> > > >>>> Best,
> > > >>>> Jark
> > > >>>>
> > > >>>>
> > > >>>> On Wed, 18 May 2022 at 13:52, Александр Смирнов <
> > smirale...@gmail.com>
> > > wrote:
> > > >>>>>
> > > >>>>> Hi Qingsheng,
> > > >>>>>
> > > >>>>> Thank you for considering my comments.
> > > >>>>>
> > > >>>>>> there might be custom logic before making retry, such as
> > > re-establish the connection
> > > >>>>>
> > > >>>>> Yes, I understand that. I meant that such logic can be placed in
> a
> > > >>>>> separate function, that can be implemented by connectors. Just
> > moving
> > > >>>>> the retry logic would make connector's LookupFunction more
> concise
> > +
> > > >>>>> avoid duplicate code. However, it's a minor change. The decision
> is
> > > up
> > > >>>>> to you.
> > > >>>>>
> > > >>>>>> We decide not to provide common DDL options and let developers
> to
> > > define their own options as we do now per connector.
> > > >>>>>
> > > >>>>> What is the reason for that? One of the main goals of this FLIP
> was
> > > to
> > > >>>>> unify the configs, wasn't it? I understand that current cache
> > design
> > > >>>>> doesn't depend on ConfigOptions, like was before. But still we
> can
> > > put
> > > >>>>> these options into the framework, so connectors can reuse them
> and
> > > >>>>> avoid code duplication, and, what is more significant, avoid
> > possible
> > > >>>>> different options naming. This moment can be pointed out in
> > > >>>>> documentation for connector developers.
> > > >>>>>
> > > >>>>> Best regards,
> > > >>>>> Alexander
> > > >>>>>
> > > >>>>> вт, 17 мая 2022 г. в 17:11, Qingsheng Ren <renqs...@gmail.com>:
> > > >>>>>>
> > > >>>>>> Hi Alexander,
> > > >>>>>>
> > > >>>>>> Thanks for the review and glad to see we are on the same page! I
> > > think you forgot to cc the dev mailing list so I’m also quoting your
> > reply
> > > under this email.
> > > >>>>>>
> > > >>>>>>> We can add 'maxRetryTimes' option into this class
> > > >>>>>>
> > > >>>>>> In my opinion the retry logic should be implemented in lookup()
> > > instead of in LookupFunction#eval(). Retrying is only meaningful under
> > some
> > > specific retriable failures, and there might be custom logic before
> > making
> > > retry, such as re-establish the connection (JdbcRowDataLookupFunction
> is
> > an
> > > example), so it's more handy to leave it to the connector.
> > > >>>>>>
> > > >>>>>>> I don't see DDL options, that were in previous version of FLIP.
> > Do
> > > you have any special plans for them?
> > > >>>>>>
> > > >>>>>> We decide not to provide common DDL options and let developers
> to
> > > define their own options as we do now per connector.
> > > >>>>>>
> > > >>>>>> The rest of comments sound great and I’ll update the FLIP. Hope
> we
> > > can finalize our proposal soon!
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>>
> > > >>>>>> Qingsheng
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>> On May 17, 2022, at 13:46, Александр Смирнов <
> > smirale...@gmail.com>
> > > wrote:
> > > >>>>>>>
> > > >>>>>>> Hi Qingsheng and devs!
> > > >>>>>>>
> > > >>>>>>> I like the overall design of updated FLIP, however I have
> several
> > > >>>>>>> suggestions and questions.
> > > >>>>>>>
> > > >>>>>>> 1) Introducing LookupFunction as a subclass of TableFunction
> is a
> > > good
> > > >>>>>>> idea. We can add 'maxRetryTimes' option into this class. 'eval'
> > > method
> > > >>>>>>> of new LookupFunction is great for this purpose. The same is
> for
> > > >>>>>>> 'async' case.
> > > >>>>>>>
> > > >>>>>>> 2) There might be other configs in future, such as
> > > 'cacheMissingKey'
> > > >>>>>>> in LookupFunctionProvider or 'rescanInterval' in
> > > ScanRuntimeProvider.
> > > >>>>>>> Maybe use Builder pattern in LookupFunctionProvider and
> > > >>>>>>> RescanRuntimeProvider for more flexibility (use one 'build'
> > method
> > > >>>>>>> instead of many 'of' methods in future)?
> > > >>>>>>>
> > > >>>>>>> 3) What are the plans for existing TableFunctionProvider and
> > > >>>>>>> AsyncTableFunctionProvider? I think they should be deprecated.
> > > >>>>>>>
> > > >>>>>>> 4) Am I right that the current design does not assume usage of
> > > >>>>>>> user-provided LookupCache in re-scanning? In this case, it is
> not
> > > very
> > > >>>>>>> clear why do we need methods such as 'invalidate' or 'putAll'
> in
> > > >>>>>>> LookupCache.
> > > >>>>>>>
> > > >>>>>>> 5) I don't see DDL options, that were in previous version of
> > FLIP.
> > > Do
> > > >>>>>>> you have any special plans for them?
> > > >>>>>>>
> > > >>>>>>> If you don't mind, I would be glad to be able to make small
> > > >>>>>>> adjustments to the FLIP document too. I think it's worth
> > mentioning
> > > >>>>>>> about what exactly optimizations are planning in the future.
> > > >>>>>>>
> > > >>>>>>> Best regards,
> > > >>>>>>> Smirnov Alexander
> > > >>>>>>>
> > > >>>>>>> пт, 13 мая 2022 г. в 20:27, Qingsheng Ren <renqs...@gmail.com
> >:
> > > >>>>>>>>
> > > >>>>>>>> Hi Alexander and devs,
> > > >>>>>>>>
> > > >>>>>>>> Thank you very much for the in-depth discussion! As Jark
> > > mentioned we were inspired by Alexander's idea and made a refactor on
> our
> > > design. FLIP-221 [1] has been updated to reflect our design now and we
> > are
> > > happy to hear more suggestions from you!
> > > >>>>>>>>
> > > >>>>>>>> Compared to the previous design:
> > > >>>>>>>> 1. The lookup cache serves at table runtime level and is
> > > integrated as a component of LookupJoinRunner as discussed previously.
> > > >>>>>>>> 2. Interfaces are renamed and re-designed to reflect the new
> > > design.
> > > >>>>>>>> 3. We separate the all-caching case individually and
> introduce a
> > > new RescanRuntimeProvider to reuse the ability of scanning. We are
> > planning
> > > to support SourceFunction / InputFormat for now considering the
> > complexity
> > > of FLIP-27 Source API.
> > > >>>>>>>> 4. A new interface LookupFunction is introduced to make the
> > > semantic of lookup more straightforward for developers.
> > > >>>>>>>>
> > > >>>>>>>> For replying to Alexander:
> > > >>>>>>>>> However I'm a little confused whether InputFormat is
> deprecated
> > > or not. Am I right that it will be so in the future, but currently it's
> > not?
> > > >>>>>>>> Yes you are right. InputFormat is not deprecated for now. I
> > think
> > > it will be deprecated in the future but we don't have a clear plan for
> > that.
> > > >>>>>>>>
> > > >>>>>>>> Thanks again for the discussion on this FLIP and looking
> forward
> > > to cooperating with you after we finalize the design and interfaces!
> > > >>>>>>>>
> > > >>>>>>>> [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > >>>>>>>>
> > > >>>>>>>> Best regards,
> > > >>>>>>>>
> > > >>>>>>>> Qingsheng
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Fri, May 13, 2022 at 12:12 AM Александр Смирнов <
> > > smirale...@gmail.com> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>> Hi Jark, Qingsheng and Leonard!
> > > >>>>>>>>>
> > > >>>>>>>>> Glad to see that we came to a consensus on almost all points!
> > > >>>>>>>>>
> > > >>>>>>>>> However I'm a little confused whether InputFormat is
> deprecated
> > > or
> > > >>>>>>>>> not. Am I right that it will be so in the future, but
> currently
> > > it's
> > > >>>>>>>>> not? Actually I also think that for the first version it's OK
> > to
> > > use
> > > >>>>>>>>> InputFormat in ALL cache realization, because supporting
> rescan
> > > >>>>>>>>> ability seems like a very distant prospect. But for this
> > > decision we
> > > >>>>>>>>> need a consensus among all discussion participants.
> > > >>>>>>>>>
> > > >>>>>>>>> In general, I don't have something to argue with your
> > > statements. All
> > > >>>>>>>>> of them correspond my ideas. Looking ahead, it would be nice
> to
> > > work
> > > >>>>>>>>> on this FLIP cooperatively. I've already done a lot of work
> on
> > > lookup
> > > >>>>>>>>> join caching with realization very close to the one we are
> > > discussing,
> > > >>>>>>>>> and want to share the results of this work. Anyway looking
> > > forward for
> > > >>>>>>>>> the FLIP update!
> > > >>>>>>>>>
> > > >>>>>>>>> Best regards,
> > > >>>>>>>>> Smirnov Alexander
> > > >>>>>>>>>
> > > >>>>>>>>> чт, 12 мая 2022 г. в 17:38, Jark Wu <imj...@gmail.com>:
> > > >>>>>>>>>>
> > > >>>>>>>>>> Hi Alex,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Thanks for summarizing your points.
> > > >>>>>>>>>>
> > > >>>>>>>>>> In the past week, Qingsheng, Leonard, and I have discussed
> it
> > > several times
> > > >>>>>>>>>> and we have totally refactored the design.
> > > >>>>>>>>>> I'm glad to say we have reached a consensus on many of your
> > > points!
> > > >>>>>>>>>> Qingsheng is still working on updating the design docs and
> > > maybe can be
> > > >>>>>>>>>> available in the next few days.
> > > >>>>>>>>>> I will share some conclusions from our discussions:
> > > >>>>>>>>>>
> > > >>>>>>>>>> 1) we have refactored the design towards to "cache in
> > > framework" way.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 2) a "LookupCache" interface for users to customize and a
> > > default
> > > >>>>>>>>>> implementation with builder for users to easy-use.
> > > >>>>>>>>>> This can both make it possible to both have flexibility and
> > > conciseness.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 3) Filter pushdown is important for ALL and LRU lookup
> cache,
> > > esp reducing
> > > >>>>>>>>>> IO.
> > > >>>>>>>>>> Filter pushdown should be the final state and the unified
> way
> > > to both
> > > >>>>>>>>>> support pruning ALL cache and LRU cache,
> > > >>>>>>>>>> so I think we should make effort in this direction. If we
> need
> > > to support
> > > >>>>>>>>>> filter pushdown for ALL cache anyway, why not use
> > > >>>>>>>>>> it for LRU cache as well? Either way, as we decide to
> > implement
> > > the cache
> > > >>>>>>>>>> in the framework, we have the chance to support
> > > >>>>>>>>>> filter on cache anytime. This is an optimization and it
> > doesn't
> > > affect the
> > > >>>>>>>>>> public API. I think we can create a JIRA issue to
> > > >>>>>>>>>> discuss it when the FLIP is accepted.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 4) The idea to support ALL cache is similar to your
> proposal.
> > > >>>>>>>>>> In the first version, we will only support InputFormat,
> > > SourceFunction for
> > > >>>>>>>>>> cache all (invoke InputFormat in join operator).
> > > >>>>>>>>>> For FLIP-27 source, we need to join a true source operator
> > > instead of
> > > >>>>>>>>>> calling it embedded in the join operator.
> > > >>>>>>>>>> However, this needs another FLIP to support the re-scan
> > ability
> > > for FLIP-27
> > > >>>>>>>>>> Source, and this can be a large work.
> > > >>>>>>>>>> In order to not block this issue, we can put the effort of
> > > FLIP-27 source
> > > >>>>>>>>>> integration into future work and integrate
> > > >>>>>>>>>> InputFormat&SourceFunction for now.
> > > >>>>>>>>>>
> > > >>>>>>>>>> I think it's fine to use InputFormat&SourceFunction, as they
> > > are not
> > > >>>>>>>>>> deprecated, otherwise, we have to introduce another function
> > > >>>>>>>>>> similar to them which is meaningless. We need to plan
> FLIP-27
> > > source
> > > >>>>>>>>>> integration ASAP before InputFormat & SourceFunction are
> > > deprecated.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Best,
> > > >>>>>>>>>> Jark
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Thu, 12 May 2022 at 15:46, Александр Смирнов <
> > > smirale...@gmail.com>
> > > >>>>>>>>>> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi Martijn!
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Got it. Therefore, the realization with InputFormat is not
> > > considered.
> > > >>>>>>>>>>> Thanks for clearing that up!
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Best regards,
> > > >>>>>>>>>>> Smirnov Alexander
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> чт, 12 мая 2022 г. в 14:23, Martijn Visser <
> > > mart...@ververica.com>:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Hi,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> With regards to:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> But if there are plans to refactor all connectors to
> > FLIP-27
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Yes, FLIP-27 is the target for all connectors. The old
> > > interfaces will be
> > > >>>>>>>>>>>> deprecated and connectors will either be refactored to use
> > > the new ones
> > > >>>>>>>>>>> or
> > > >>>>>>>>>>>> dropped.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> The caching should work for connectors that are using
> > FLIP-27
> > > interfaces,
> > > >>>>>>>>>>>> we should not introduce new features for old interfaces.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Martijn
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Thu, 12 May 2022 at 06:19, Александр Смирнов <
> > > smirale...@gmail.com>
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Jark!
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Sorry for the late response. I would like to make some
> > > comments and
> > > >>>>>>>>>>>>> clarify my points.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1) I agree with your first statement. I think we can
> > achieve
> > > both
> > > >>>>>>>>>>>>> advantages this way: put the Cache interface in
> > > flink-table-common,
> > > >>>>>>>>>>>>> but have implementations of it in flink-table-runtime.
> > > Therefore if a
> > > >>>>>>>>>>>>> connector developer wants to use existing cache
> strategies
> > > and their
> > > >>>>>>>>>>>>> implementations, he can just pass lookupConfig to the
> > > planner, but if
> > > >>>>>>>>>>>>> he wants to have its own cache implementation in his
> > > TableFunction, it
> > > >>>>>>>>>>>>> will be possible for him to use the existing interface
> for
> > > this
> > > >>>>>>>>>>>>> purpose (we can explicitly point this out in the
> > > documentation). In
> > > >>>>>>>>>>>>> this way all configs and metrics will be unified. WDYT?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will
> > > have 90% of
> > > >>>>>>>>>>>>> lookup requests that can never be cached
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 2) Let me clarify the logic filters optimization in case
> of
> > > LRU cache.
> > > >>>>>>>>>>>>> It looks like Cache<RowData, Collection<RowData>>. Here
> we
> > > always
> > > >>>>>>>>>>>>> store the response of the dimension table in cache, even
> > > after
> > > >>>>>>>>>>>>> applying calc function. I.e. if there are no rows after
> > > applying
> > > >>>>>>>>>>>>> filters to the result of the 'eval' method of
> > TableFunction,
> > > we store
> > > >>>>>>>>>>>>> the empty list by lookup keys. Therefore the cache line
> > will
> > > be
> > > >>>>>>>>>>>>> filled, but will require much less memory (in bytes).
> I.e.
> > > we don't
> > > >>>>>>>>>>>>> completely filter keys, by which result was pruned, but
> > > significantly
> > > >>>>>>>>>>>>> reduce required memory to store this result. If the user
> > > knows about
> > > >>>>>>>>>>>>> this behavior, he can increase the 'max-rows' option
> before
> > > the start
> > > >>>>>>>>>>>>> of the job. But actually I came up with the idea that we
> > can
> > > do this
> > > >>>>>>>>>>>>> automatically by using the 'maximumWeight' and 'weigher'
> > > methods of
> > > >>>>>>>>>>>>> GuavaCache [1]. Weight can be the size of the collection
> of
> > > rows
> > > >>>>>>>>>>>>> (value of cache). Therefore cache can automatically fit
> > much
> > > more
> > > >>>>>>>>>>>>> records than before.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Flink SQL has provided a standard way to do filters and
> > > projects
> > > >>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
> > > SupportsProjectionPushDown.
> > > >>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces,
> don't
> > > mean it's
> > > >>>>>>>>>>> hard
> > > >>>>>>>>>>>>> to implement.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> It's debatable how difficult it will be to implement
> filter
> > > pushdown.
> > > >>>>>>>>>>>>> But I think the fact that currently there is no database
> > > connector
> > > >>>>>>>>>>>>> with filter pushdown at least means that this feature
> won't
> > > be
> > > >>>>>>>>>>>>> supported soon in connectors. Moreover, if we talk about
> > > other
> > > >>>>>>>>>>>>> connectors (not in Flink repo), their databases might not
> > > support all
> > > >>>>>>>>>>>>> Flink filters (or not support filters at all). I think
> > users
> > > are
> > > >>>>>>>>>>>>> interested in supporting cache filters optimization
> > > independently of
> > > >>>>>>>>>>>>> supporting other features and solving more complex
> problems
> > > (or
> > > >>>>>>>>>>>>> unsolvable at all).
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3) I agree with your third statement. Actually in our
> > > internal version
> > > >>>>>>>>>>>>> I also tried to unify the logic of scanning and reloading
> > > data from
> > > >>>>>>>>>>>>> connectors. But unfortunately, I didn't find a way to
> unify
> > > the logic
> > > >>>>>>>>>>>>> of all ScanRuntimeProviders (InputFormat, SourceFunction,
> > > Source,...)
> > > >>>>>>>>>>>>> and reuse it in reloading ALL cache. As a result I
> settled
> > > on using
> > > >>>>>>>>>>>>> InputFormat, because it was used for scanning in all
> lookup
> > > >>>>>>>>>>>>> connectors. (I didn't know that there are plans to
> > deprecate
> > > >>>>>>>>>>>>> InputFormat in favor of FLIP-27 Source). IMO usage of
> > > FLIP-27 source
> > > >>>>>>>>>>>>> in ALL caching is not good idea, because this source was
> > > designed to
> > > >>>>>>>>>>>>> work in distributed environment (SplitEnumerator on
> > > JobManager and
> > > >>>>>>>>>>>>> SourceReaders on TaskManagers), not in one operator
> (lookup
> > > join
> > > >>>>>>>>>>>>> operator in our case). There is even no direct way to
> pass
> > > splits from
> > > >>>>>>>>>>>>> SplitEnumerator to SourceReader (this logic works through
> > > >>>>>>>>>>>>> SplitEnumeratorContext, which requires
> > > >>>>>>>>>>>>> OperatorCoordinator.SubtaskGateway to send
> AddSplitEvents).
> > > Usage of
> > > >>>>>>>>>>>>> InputFormat for ALL cache seems much more clearer and
> > > easier. But if
> > > >>>>>>>>>>>>> there are plans to refactor all connectors to FLIP-27, I
> > > have the
> > > >>>>>>>>>>>>> following ideas: maybe we can refuse from lookup join ALL
> > > cache in
> > > >>>>>>>>>>>>> favor of simple join with multiple scanning of batch
> > source?
> > > The point
> > > >>>>>>>>>>>>> is that the only difference between lookup join ALL cache
> > > and simple
> > > >>>>>>>>>>>>> join with batch source is that in the first case scanning
> > is
> > > performed
> > > >>>>>>>>>>>>> multiple times, in between which state (cache) is cleared
> > > (correct me
> > > >>>>>>>>>>>>> if I'm wrong). So what if we extend the functionality of
> > > simple join
> > > >>>>>>>>>>>>> to support state reloading + extend the functionality of
> > > scanning
> > > >>>>>>>>>>>>> batch source multiple times (this one should be easy with
> > > new FLIP-27
> > > >>>>>>>>>>>>> source, that unifies streaming/batch reading - we will
> need
> > > to change
> > > >>>>>>>>>>>>> only SplitEnumerator, which will pass splits again after
> > > some TTL).
> > > >>>>>>>>>>>>> WDYT? I must say that this looks like a long-term goal
> and
> > > will make
> > > >>>>>>>>>>>>> the scope of this FLIP even larger than you said. Maybe
> we
> > > can limit
> > > >>>>>>>>>>>>> ourselves to a simpler solution now (InputFormats).
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> So to sum up, my points is like this:
> > > >>>>>>>>>>>>> 1) There is a way to make both concise and flexible
> > > interfaces for
> > > >>>>>>>>>>>>> caching in lookup join.
> > > >>>>>>>>>>>>> 2) Cache filters optimization is important both in LRU
> and
> > > ALL caches.
> > > >>>>>>>>>>>>> 3) It is unclear when filter pushdown will be supported
> in
> > > Flink
> > > >>>>>>>>>>>>> connectors, some of the connectors might not have the
> > > opportunity to
> > > >>>>>>>>>>>>> support filter pushdown + as I know, currently filter
> > > pushdown works
> > > >>>>>>>>>>>>> only for scanning (not lookup). So cache filters +
> > > projections
> > > >>>>>>>>>>>>> optimization should be independent from other features.
> > > >>>>>>>>>>>>> 4) ALL cache realization is a complex topic that involves
> > > multiple
> > > >>>>>>>>>>>>> aspects of how Flink is developing. Refusing from
> > > InputFormat in favor
> > > >>>>>>>>>>>>> of FLIP-27 Source will make ALL cache realization really
> > > complex and
> > > >>>>>>>>>>>>> not clear, so maybe instead of that we can extend the
> > > functionality of
> > > >>>>>>>>>>>>> simple join or not refuse from InputFormat in case of
> > lookup
> > > join ALL
> > > >>>>>>>>>>>>> cache?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>> Smirnov Alexander
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> [1]
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > >
> >
> https://guava.dev/releases/18.0/api/docs/com/google/common/cache/CacheBuilder.html#weigher(com.google.common.cache.Weigher)
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> чт, 5 мая 2022 г. в 20:34, Jark Wu <imj...@gmail.com>:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> It's great to see the active discussion! I want to share
> > my
> > > ideas:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 1) implement the cache in framework vs. connectors base
> > > >>>>>>>>>>>>>> I don't have a strong opinion on this. Both ways should
> > > work (e.g.,
> > > >>>>>>>>>>> cache
> > > >>>>>>>>>>>>>> pruning, compatibility).
> > > >>>>>>>>>>>>>> The framework way can provide more concise interfaces.
> > > >>>>>>>>>>>>>> The connector base way can define more flexible cache
> > > >>>>>>>>>>>>>> strategies/implementations.
> > > >>>>>>>>>>>>>> We are still investigating a way to see if we can have
> > both
> > > >>>>>>>>>>> advantages.
> > > >>>>>>>>>>>>>> We should reach a consensus that the way should be a
> final
> > > state,
> > > >>>>>>>>>>> and we
> > > >>>>>>>>>>>>>> are on the path to it.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 2) filters and projections pushdown:
> > > >>>>>>>>>>>>>> I agree with Alex that the filter pushdown into cache
> can
> > > benefit a
> > > >>>>>>>>>>> lot
> > > >>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>> ALL cache.
> > > >>>>>>>>>>>>>> However, this is not true for LRU cache. Connectors use
> > > cache to
> > > >>>>>>>>>>> reduce
> > > >>>>>>>>>>>>> IO
> > > >>>>>>>>>>>>>> requests to databases for better throughput.
> > > >>>>>>>>>>>>>> If a filter can prune 90% of data in the cache, we will
> > > have 90% of
> > > >>>>>>>>>>>>> lookup
> > > >>>>>>>>>>>>>> requests that can never be cached
> > > >>>>>>>>>>>>>> and hit directly to the databases. That means the cache
> is
> > > >>>>>>>>>>> meaningless in
> > > >>>>>>>>>>>>>> this case.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> IMO, Flink SQL has provided a standard way to do filters
> > > and projects
> > > >>>>>>>>>>>>>> pushdown, i.e., SupportsFilterPushDown and
> > > >>>>>>>>>>> SupportsProjectionPushDown.
> > > >>>>>>>>>>>>>> Jdbc/hive/HBase haven't implemented the interfaces,
> don't
> > > mean it's
> > > >>>>>>>>>>> hard
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>> implement.
> > > >>>>>>>>>>>>>> They should implement the pushdown interfaces to reduce
> IO
> > > and the
> > > >>>>>>>>>>> cache
> > > >>>>>>>>>>>>>> size.
> > > >>>>>>>>>>>>>> That should be a final state that the scan source and
> > > lookup source
> > > >>>>>>>>>>> share
> > > >>>>>>>>>>>>>> the exact pushdown implementation.
> > > >>>>>>>>>>>>>> I don't see why we need to duplicate the pushdown logic
> in
> > > caches,
> > > >>>>>>>>>>> which
> > > >>>>>>>>>>>>>> will complex the lookup join design.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 3) ALL cache abstraction
> > > >>>>>>>>>>>>>> All cache might be the most challenging part of this
> FLIP.
> > > We have
> > > >>>>>>>>>>> never
> > > >>>>>>>>>>>>>> provided a reload-lookup public interface.
> > > >>>>>>>>>>>>>> Currently, we put the reload logic in the "eval" method
> of
> > > >>>>>>>>>>> TableFunction.
> > > >>>>>>>>>>>>>> That's hard for some sources (e.g., Hive).
> > > >>>>>>>>>>>>>> Ideally, connector implementation should share the logic
> > of
> > > reload
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>>>>> scan, i.e. ScanTableSource with
> > > InputFormat/SourceFunction/FLIP-27
> > > >>>>>>>>>>>>> Source.
> > > >>>>>>>>>>>>>> However, InputFormat/SourceFunction are deprecated, and
> > the
> > > FLIP-27
> > > >>>>>>>>>>>>> source
> > > >>>>>>>>>>>>>> is deeply coupled with SourceOperator.
> > > >>>>>>>>>>>>>> If we want to invoke the FLIP-27 source in LookupJoin,
> > this
> > > may make
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>> scope of this FLIP much larger.
> > > >>>>>>>>>>>>>> We are still investigating how to abstract the ALL cache
> > > logic and
> > > >>>>>>>>>>> reuse
> > > >>>>>>>>>>>>>> the existing source interfaces.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>> Jark
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Thu, 5 May 2022 at 20:22, Roman Boyko <
> > > ro.v.bo...@gmail.com>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> It's a much more complicated activity and lies out of
> the
> > > scope of
> > > >>>>>>>>>>> this
> > > >>>>>>>>>>>>>>> improvement. Because such pushdowns should be done for
> > all
> > > >>>>>>>>>>>>> ScanTableSource
> > > >>>>>>>>>>>>>>> implementations (not only for Lookup ones).
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Thu, 5 May 2022 at 19:02, Martijn Visser <
> > > >>>>>>>>>>> martijnvis...@apache.org>
> > > >>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hi everyone,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> One question regarding "And Alexander correctly
> > mentioned
> > > that
> > > >>>>>>>>>>> filter
> > > >>>>>>>>>>>>>>>> pushdown still is not implemented for
> jdbc/hive/hbase."
> > > -> Would
> > > >>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>> alternative solution be to actually implement these
> > filter
> > > >>>>>>>>>>> pushdowns?
> > > >>>>>>>>>>>>> I
> > > >>>>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>> imagine that there are many more benefits to doing
> that,
> > > outside
> > > >>>>>>>>>>> of
> > > >>>>>>>>>>>>> lookup
> > > >>>>>>>>>>>>>>>> caching and metrics.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Martijn Visser
> > > >>>>>>>>>>>>>>>> https://twitter.com/MartijnVisser82
> > > >>>>>>>>>>>>>>>> https://github.com/MartijnVisser
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On Thu, 5 May 2022 at 13:58, Roman Boyko <
> > > ro.v.bo...@gmail.com>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Hi everyone!
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Thanks for driving such a valuable improvement!
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> I do think that single cache implementation would be
> a
> > > nice
> > > >>>>>>>>>>>>> opportunity
> > > >>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>> users. And it will break the "FOR SYSTEM_TIME AS OF
> > > proc_time"
> > > >>>>>>>>>>>>> semantics
> > > >>>>>>>>>>>>>>>>> anyway - doesn't matter how it will be implemented.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Putting myself in the user's shoes, I can say that:
> > > >>>>>>>>>>>>>>>>> 1) I would prefer to have the opportunity to cut off
> > the
> > > cache
> > > >>>>>>>>>>> size
> > > >>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>>>> simply filtering unnecessary data. And the most handy
> > > way to do
> > > >>>>>>>>>>> it
> > > >>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>> apply
> > > >>>>>>>>>>>>>>>>> it inside LookupRunners. It would be a bit harder to
> > > pass it
> > > >>>>>>>>>>>>> through the
> > > >>>>>>>>>>>>>>>>> LookupJoin node to TableFunction. And Alexander
> > correctly
> > > >>>>>>>>>>> mentioned
> > > >>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>> filter pushdown still is not implemented for
> > > jdbc/hive/hbase.
> > > >>>>>>>>>>>>>>>>> 2) The ability to set the different caching
> parameters
> > > for
> > > >>>>>>>>>>> different
> > > >>>>>>>>>>>>>>>> tables
> > > >>>>>>>>>>>>>>>>> is quite important. So I would prefer to set it
> through
> > > DDL
> > > >>>>>>>>>>> rather
> > > >>>>>>>>>>>>> than
> > > >>>>>>>>>>>>>>>>> have similar ttla, strategy and other options for all
> > > lookup
> > > >>>>>>>>>>> tables.
> > > >>>>>>>>>>>>>>>>> 3) Providing the cache into the framework really
> > > deprives us of
> > > >>>>>>>>>>>>>>>>> extensibility (users won't be able to implement their
> > own
> > > >>>>>>>>>>> cache).
> > > >>>>>>>>>>>>> But
> > > >>>>>>>>>>>>>>>> most
> > > >>>>>>>>>>>>>>>>> probably it might be solved by creating more
> different
> > > cache
> > > >>>>>>>>>>>>> strategies
> > > >>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>> a wider set of configurations.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> All these points are much closer to the schema
> proposed
> > > by
> > > >>>>>>>>>>>>> Alexander.
> > > >>>>>>>>>>>>>>>>> Qingshen Ren, please correct me if I'm not right and
> > all
> > > these
> > > >>>>>>>>>>>>>>>> facilities
> > > >>>>>>>>>>>>>>>>> might be simply implemented in your architecture?
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>>>>>> Roman Boyko
> > > >>>>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> On Wed, 4 May 2022 at 21:01, Martijn Visser <
> > > >>>>>>>>>>>>> martijnvis...@apache.org>
> > > >>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Hi everyone,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> I don't have much to chip in, but just wanted to
> > > express that
> > > >>>>>>>>>>> I
> > > >>>>>>>>>>>>> really
> > > >>>>>>>>>>>>>>>>>> appreciate the in-depth discussion on this topic
> and I
> > > hope
> > > >>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>> others
> > > >>>>>>>>>>>>>>>>>> will join the conversation.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Martijn
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On Tue, 3 May 2022 at 10:15, Александр Смирнов <
> > > >>>>>>>>>>>>> smirale...@gmail.com>
> > > >>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Hi Qingsheng, Leonard and Jark,
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Thanks for your detailed feedback! However, I have
> > > questions
> > > >>>>>>>>>>>>> about
> > > >>>>>>>>>>>>>>>>>>> some of your statements (maybe I didn't get
> > > something?).
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Caching actually breaks the semantic of "FOR
> > > SYSTEM_TIME
> > > >>>>>>>>>>> AS OF
> > > >>>>>>>>>>>>>>>>>> proc_time”
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I agree that the semantics of "FOR SYSTEM_TIME AS
> OF
> > > >>>>>>>>>>> proc_time"
> > > >>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>> fully implemented with caching, but as you said,
> > users
> > > go
> > > >>>>>>>>>>> on it
> > > >>>>>>>>>>>>>>>>>>> consciously to achieve better performance (no one
> > > proposed
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>> enable
> > > >>>>>>>>>>>>>>>>>>> caching by default, etc.). Or by users do you mean
> > > other
> > > >>>>>>>>>>>>> developers
> > > >>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>> connectors? In this case developers explicitly
> > specify
> > > >>>>>>>>>>> whether
> > > >>>>>>>>>>>>> their
> > > >>>>>>>>>>>>>>>>>>> connector supports caching or not (in the list of
> > > supported
> > > >>>>>>>>>>>>>>>> options),
> > > >>>>>>>>>>>>>>>>>>> no one makes them do that if they don't want to. So
> > > what
> > > >>>>>>>>>>>>> exactly is
> > > >>>>>>>>>>>>>>>>>>> the difference between implementing caching in
> > modules
> > > >>>>>>>>>>>>>>>>>>> flink-table-runtime and in flink-table-common from
> > the
> > > >>>>>>>>>>>>> considered
> > > >>>>>>>>>>>>>>>>>>> point of view? How does it affect on
> > > breaking/non-breaking
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> semantics of "FOR SYSTEM_TIME AS OF proc_time"?
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> confront a situation that allows table options in
> > DDL
> > > to
> > > >>>>>>>>>>>>> control
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> behavior of the framework, which has never happened
> > > >>>>>>>>>>> previously
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>> should
> > > >>>>>>>>>>>>>>>>>>> be cautious
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> If we talk about main differences of semantics of
> DDL
> > > >>>>>>>>>>> options
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>> config options("table.exec.xxx"), isn't it about
> > > limiting
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>> scope
> > > >>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>> the options + importance for the user business
> logic
> > > rather
> > > >>>>>>>>>>> than
> > > >>>>>>>>>>>>>>>>>>> specific location of corresponding logic in the
> > > framework? I
> > > >>>>>>>>>>>>> mean
> > > >>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>> in my design, for example, putting an option with
> > > lookup
> > > >>>>>>>>>>> cache
> > > >>>>>>>>>>>>>>>>>>> strategy in configurations would  be the wrong
> > > decision,
> > > >>>>>>>>>>>>> because it
> > > >>>>>>>>>>>>>>>>>>> directly affects the user's business logic (not
> just
> > > >>>>>>>>>>> performance
> > > >>>>>>>>>>>>>>>>>>> optimization) + touches just several functions of
> ONE
> > > table
> > > >>>>>>>>>>>>> (there
> > > >>>>>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>>> be multiple tables with different caches). Does it
> > > really
> > > >>>>>>>>>>>>> matter for
> > > >>>>>>>>>>>>>>>>>>> the user (or someone else) where the logic is
> > located,
> > > >>>>>>>>>>> which is
> > > >>>>>>>>>>>>>>>>>>> affected by the applied option?
> > > >>>>>>>>>>>>>>>>>>> Also I can remember DDL option 'sink.parallelism',
> > > which in
> > > >>>>>>>>>>>>> some way
> > > >>>>>>>>>>>>>>>>>>> "controls the behavior of the framework" and I
> don't
> > > see any
> > > >>>>>>>>>>>>> problem
> > > >>>>>>>>>>>>>>>>>>> here.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> introduce a new interface for this all-caching
> > > scenario
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> design
> > > >>>>>>>>>>>>>>>>>>> would become more complex
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> This is a subject for a separate discussion, but
> > > actually
> > > >>>>>>>>>>> in our
> > > >>>>>>>>>>>>>>>>>>> internal version we solved this problem quite
> easily
> > -
> > > we
> > > >>>>>>>>>>> reused
> > > >>>>>>>>>>>>>>>>>>> InputFormat class (so there is no need for a new
> > API).
> > > The
> > > >>>>>>>>>>>>> point is
> > > >>>>>>>>>>>>>>>>>>> that currently all lookup connectors use
> InputFormat
> > > for
> > > >>>>>>>>>>>>> scanning
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> data in batch mode: HBase, JDBC and even Hive - it
> > uses
> > > >>>>>>>>>>> class
> > > >>>>>>>>>>>>>>>>>>> PartitionReader, that is actually just a wrapper
> > around
> > > >>>>>>>>>>>>> InputFormat.
> > > >>>>>>>>>>>>>>>>>>> The advantage of this solution is the ability to
> > reload
> > > >>>>>>>>>>> cache
> > > >>>>>>>>>>>>> data
> > > >>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>> parallel (number of threads depends on number of
> > > >>>>>>>>>>> InputSplits,
> > > >>>>>>>>>>>>> but
> > > >>>>>>>>>>>>>>>> has
> > > >>>>>>>>>>>>>>>>>>> an upper limit). As a result cache reload time
> > > significantly
> > > >>>>>>>>>>>>> reduces
> > > >>>>>>>>>>>>>>>>>>> (as well as time of input stream blocking). I know
> > that
> > > >>>>>>>>>>> usually
> > > >>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>> try
> > > >>>>>>>>>>>>>>>>>>> to avoid usage of concurrency in Flink code, but
> > maybe
> > > this
> > > >>>>>>>>>>> one
> > > >>>>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>> be
> > > >>>>>>>>>>>>>>>>>>> an exception. BTW I don't say that it's an ideal
> > > solution,
> > > >>>>>>>>>>> maybe
> > > >>>>>>>>>>>>>>>> there
> > > >>>>>>>>>>>>>>>>>>> are better ones.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Providing the cache in the framework might
> introduce
> > > >>>>>>>>>>>>> compatibility
> > > >>>>>>>>>>>>>>>>>> issues
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> It's possible only in cases when the developer of
> the
> > > >>>>>>>>>>> connector
> > > >>>>>>>>>>>>>>>> won't
> > > >>>>>>>>>>>>>>>>>>> properly refactor his code and will use new cache
> > > options
> > > >>>>>>>>>>>>>>>> incorrectly
> > > >>>>>>>>>>>>>>>>>>> (i.e. explicitly provide the same options into 2
> > > different
> > > >>>>>>>>>>> code
> > > >>>>>>>>>>>>>>>>>>> places). For correct behavior all he will need to
> do
> > > is to
> > > >>>>>>>>>>>>> redirect
> > > >>>>>>>>>>>>>>>>>>> existing options to the framework's LookupConfig (+
> > > maybe
> > > >>>>>>>>>>> add an
> > > >>>>>>>>>>>>>>>> alias
> > > >>>>>>>>>>>>>>>>>>> for options, if there was different naming),
> > everything
> > > >>>>>>>>>>> will be
> > > >>>>>>>>>>>>>>>>>>> transparent for users. If the developer won't do
> > > >>>>>>>>>>> refactoring at
> > > >>>>>>>>>>>>> all,
> > > >>>>>>>>>>>>>>>>>>> nothing will be changed for the connector because
> of
> > > >>>>>>>>>>> backward
> > > >>>>>>>>>>>>>>>>>>> compatibility. Also if a developer wants to use his
> > own
> > > >>>>>>>>>>> cache
> > > >>>>>>>>>>>>> logic,
> > > >>>>>>>>>>>>>>>>>>> he just can refuse to pass some of the configs into
> > the
> > > >>>>>>>>>>>>> framework,
> > > >>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>> instead make his own implementation with already
> > > existing
> > > >>>>>>>>>>>>> configs
> > > >>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>> metrics (but actually I think that it's a rare
> case).
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> filters and projections should be pushed all the
> way
> > > down
> > > >>>>>>>>>>> to
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>> function, like what we do in the scan source
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> It's the great purpose. But the truth is that the
> > ONLY
> > > >>>>>>>>>>> connector
> > > >>>>>>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>>>>>> supports filter pushdown is FileSystemTableSource
> > > >>>>>>>>>>>>>>>>>>> (no database connector supports it currently). Also
> > > for some
> > > >>>>>>>>>>>>>>>> databases
> > > >>>>>>>>>>>>>>>>>>> it's simply impossible to pushdown such complex
> > filters
> > > >>>>>>>>>>> that we
> > > >>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>>> in Flink.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> only applying these optimizations to the cache
> seems
> > > not
> > > >>>>>>>>>>>>> quite
> > > >>>>>>>>>>>>>>>>> useful
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Filters can cut off an arbitrarily large amount of
> > data
> > > >>>>>>>>>>> from the
> > > >>>>>>>>>>>>>>>>>>> dimension table. For a simple example, suppose in
> > > dimension
> > > >>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>> 'users'
> > > >>>>>>>>>>>>>>>>>>> we have column 'age' with values from 20 to 40, and
> > > input
> > > >>>>>>>>>>> stream
> > > >>>>>>>>>>>>>>>>>>> 'clicks' that is ~uniformly distributed by age of
> > > users. If
> > > >>>>>>>>>>> we
> > > >>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>>> filter 'age > 30',
> > > >>>>>>>>>>>>>>>>>>> there will be twice less data in cache. This means
> > the
> > > user
> > > >>>>>>>>>>> can
> > > >>>>>>>>>>>>>>>>>>> increase 'lookup.cache.max-rows' by almost 2 times.
> > It
> > > will
> > > >>>>>>>>>>>>> gain a
> > > >>>>>>>>>>>>>>>>>>> huge
> > > >>>>>>>>>>>>>>>>>>> performance boost. Moreover, this optimization
> starts
> > > to
> > > >>>>>>>>>>> really
> > > >>>>>>>>>>>>>>>> shine
> > > >>>>>>>>>>>>>>>>>>> in 'ALL' cache, where tables without filters and
> > > projections
> > > >>>>>>>>>>>>> can't
> > > >>>>>>>>>>>>>>>> fit
> > > >>>>>>>>>>>>>>>>>>> in memory, but with them - can. This opens up
> > > additional
> > > >>>>>>>>>>>>>>>> possibilities
> > > >>>>>>>>>>>>>>>>>>> for users. And this doesn't sound as 'not quite
> > > useful'.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> It would be great to hear other voices regarding
> this
> > > topic!
> > > >>>>>>>>>>>>> Because
> > > >>>>>>>>>>>>>>>>>>> we have quite a lot of controversial points, and I
> > > think
> > > >>>>>>>>>>> with
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> help
> > > >>>>>>>>>>>>>>>>>>> of others it will be easier for us to come to a
> > > consensus.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>>>>>>>> Smirnov Alexander
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> пт, 29 апр. 2022 г. в 22:33, Qingsheng Ren <
> > > >>>>>>>>>>> renqs...@gmail.com
> > > >>>>>>>>>>>>>> :
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Hi Alexander and Arvid,
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Thanks for the discussion and sorry for my late
> > > response!
> > > >>>>>>>>>>> We
> > > >>>>>>>>>>>>> had
> > > >>>>>>>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>>>> internal discussion together with Jark and Leonard
> > and
> > > I’d
> > > >>>>>>>>>>> like
> > > >>>>>>>>>>>>> to
> > > >>>>>>>>>>>>>>>>>>> summarize our ideas. Instead of implementing the
> > cache
> > > >>>>>>>>>>> logic in
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>> runtime layer or wrapping around the user-provided
> > > table
> > > >>>>>>>>>>>>> function,
> > > >>>>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>>>>> prefer to introduce some new APIs extending
> > > TableFunction
> > > >>>>>>>>>>> with
> > > >>>>>>>>>>>>> these
> > > >>>>>>>>>>>>>>>>>>> concerns:
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> 1. Caching actually breaks the semantic of "FOR
> > > >>>>>>>>>>> SYSTEM_TIME
> > > >>>>>>>>>>>>> AS OF
> > > >>>>>>>>>>>>>>>>>>> proc_time”, because it couldn’t truly reflect the
> > > content
> > > >>>>>>>>>>> of the
> > > >>>>>>>>>>>>>>>> lookup
> > > >>>>>>>>>>>>>>>>>>> table at the moment of querying. If users choose to
> > > enable
> > > >>>>>>>>>>>>> caching
> > > >>>>>>>>>>>>>>>> on
> > > >>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> lookup table, they implicitly indicate that this
> > > breakage is
> > > >>>>>>>>>>>>>>>> acceptable
> > > >>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>> exchange for the performance. So we prefer not to
> > > provide
> > > >>>>>>>>>>>>> caching on
> > > >>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> table runtime level.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> 2. If we make the cache implementation in the
> > > framework
> > > >>>>>>>>>>>>> (whether
> > > >>>>>>>>>>>>>>>> in a
> > > >>>>>>>>>>>>>>>>>>> runner or a wrapper around TableFunction), we have
> to
> > > >>>>>>>>>>> confront a
> > > >>>>>>>>>>>>>>>>>> situation
> > > >>>>>>>>>>>>>>>>>>> that allows table options in DDL to control the
> > > behavior of
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> framework,
> > > >>>>>>>>>>>>>>>>>>> which has never happened previously and should be
> > > cautious.
> > > >>>>>>>>>>>>> Under
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> current design the behavior of the framework should
> > > only be
> > > >>>>>>>>>>>>>>>> specified
> > > >>>>>>>>>>>>>>>>> by
> > > >>>>>>>>>>>>>>>>>>> configurations (“table.exec.xxx”), and it’s hard to
> > > apply
> > > >>>>>>>>>>> these
> > > >>>>>>>>>>>>>>>> general
> > > >>>>>>>>>>>>>>>>>>> configs to a specific table.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> 3. We have use cases that lookup source loads and
> > > refresh
> > > >>>>>>>>>>> all
> > > >>>>>>>>>>>>>>>> records
> > > >>>>>>>>>>>>>>>>>>> periodically into the memory to achieve high lookup
> > > >>>>>>>>>>> performance
> > > >>>>>>>>>>>>>>>> (like
> > > >>>>>>>>>>>>>>>>>> Hive
> > > >>>>>>>>>>>>>>>>>>> connector in the community, and also widely used by
> > our
> > > >>>>>>>>>>> internal
> > > >>>>>>>>>>>>>>>>>>> connectors). Wrapping the cache around the user’s
> > > >>>>>>>>>>> TableFunction
> > > >>>>>>>>>>>>>>>> works
> > > >>>>>>>>>>>>>>>>>> fine
> > > >>>>>>>>>>>>>>>>>>> for LRU caches, but I think we have to introduce a
> > new
> > > >>>>>>>>>>>>> interface for
> > > >>>>>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>>>> all-caching scenario and the design would become
> more
> > > >>>>>>>>>>> complex.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> 4. Providing the cache in the framework might
> > > introduce
> > > >>>>>>>>>>>>>>>> compatibility
> > > >>>>>>>>>>>>>>>>>>> issues to existing lookup sources like there might
> > > exist two
> > > >>>>>>>>>>>>> caches
> > > >>>>>>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>>>>> totally different strategies if the user
> incorrectly
> > > >>>>>>>>>>> configures
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> table
> > > >>>>>>>>>>>>>>>>>>> (one in the framework and another implemented by
> the
> > > lookup
> > > >>>>>>>>>>>>> source).
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> As for the optimization mentioned by Alexander, I
> > > think
> > > >>>>>>>>>>>>> filters
> > > >>>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>>>> projections should be pushed all the way down to
> the
> > > table
> > > >>>>>>>>>>>>> function,
> > > >>>>>>>>>>>>>>>>> like
> > > >>>>>>>>>>>>>>>>>>> what we do in the scan source, instead of the
> runner
> > > with
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>> cache.
> > > >>>>>>>>>>>>>>>>> The
> > > >>>>>>>>>>>>>>>>>>> goal of using cache is to reduce the network I/O
> and
> > > >>>>>>>>>>> pressure
> > > >>>>>>>>>>>>> on the
> > > >>>>>>>>>>>>>>>>>>> external system, and only applying these
> > optimizations
> > > to
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>> cache
> > > >>>>>>>>>>>>>>>>> seems
> > > >>>>>>>>>>>>>>>>>>> not quite useful.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> I made some updates to the FLIP[1] to reflect our
> > > ideas.
> > > >>>>>>>>>>> We
> > > >>>>>>>>>>>>>>>> prefer to
> > > >>>>>>>>>>>>>>>>>>> keep the cache implementation as a part of
> > > TableFunction,
> > > >>>>>>>>>>> and we
> > > >>>>>>>>>>>>>>>> could
> > > >>>>>>>>>>>>>>>>>>> provide some helper classes (CachingTableFunction,
> > > >>>>>>>>>>>>>>>>>> AllCachingTableFunction,
> > > >>>>>>>>>>>>>>>>>>> CachingAsyncTableFunction) to developers and
> regulate
> > > >>>>>>>>>>> metrics
> > > >>>>>>>>>>>>> of the
> > > >>>>>>>>>>>>>>>>>> cache.
> > > >>>>>>>>>>>>>>>>>>> Also, I made a POC[2] for your reference.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Looking forward to your ideas!
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> [1]
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > >>>>>>>>>>>>>>>>>>>> [2]
> > https://github.com/PatrickRen/flink/tree/FLIP-221
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Qingsheng
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> On Tue, Apr 26, 2022 at 4:45 PM Александр Смирнов
> <
> > > >>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> > > >>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the response, Arvid!
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> I have few comments on your message.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> but could also live with an easier solution as
> the
> > > >>>>>>>>>>> first
> > > >>>>>>>>>>>>> step:
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> I think that these 2 ways are mutually exclusive
> > > >>>>>>>>>>> (originally
> > > >>>>>>>>>>>>>>>>> proposed
> > > >>>>>>>>>>>>>>>>>>>>> by Qingsheng and mine), because conceptually they
> > > follow
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>> same
> > > >>>>>>>>>>>>>>>>>>>>> goal, but implementation details are different.
> If
> > we
> > > >>>>>>>>>>> will
> > > >>>>>>>>>>>>> go one
> > > >>>>>>>>>>>>>>>>> way,
> > > >>>>>>>>>>>>>>>>>>>>> moving to another way in the future will mean
> > > deleting
> > > >>>>>>>>>>>>> existing
> > > >>>>>>>>>>>>>>>> code
> > > >>>>>>>>>>>>>>>>>>>>> and once again changing the API for connectors.
> So
> > I
> > > >>>>>>>>>>> think we
> > > >>>>>>>>>>>>>>>> should
> > > >>>>>>>>>>>>>>>>>>>>> reach a consensus with the community about that
> and
> > > then
> > > >>>>>>>>>>> work
> > > >>>>>>>>>>>>>>>>> together
> > > >>>>>>>>>>>>>>>>>>>>> on this FLIP, i.e. divide the work on tasks for
> > > different
> > > >>>>>>>>>>>>> parts
> > > >>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>> flip (for example, LRU cache unification /
> > > introducing
> > > >>>>>>>>>>>>> proposed
> > > >>>>>>>>>>>>>>>> set
> > > >>>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>> metrics / further work…). WDYT, Qingsheng?
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> as the source will only receive the requests
> after
> > > >>>>>>>>>>> filter
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Actually if filters are applied to fields of the
> > > lookup
> > > >>>>>>>>>>>>> table, we
> > > >>>>>>>>>>>>>>>>>>>>> firstly must do requests, and only after that we
> > can
> > > >>>>>>>>>>> filter
> > > >>>>>>>>>>>>>>>>> responses,
> > > >>>>>>>>>>>>>>>>>>>>> because lookup connectors don't have filter
> > > pushdown. So
> > > >>>>>>>>>>> if
> > > >>>>>>>>>>>>>>>>> filtering
> > > >>>>>>>>>>>>>>>>>>>>> is done before caching, there will be much less
> > rows
> > > in
> > > >>>>>>>>>>>>> cache.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is
> not
> > > >>>>>>>>>>> shared.
> > > >>>>>>>>>>>>> I
> > > >>>>>>>>>>>>>>>> don't
> > > >>>>>>>>>>>>>>>>>>> know the
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Sorry for that, I’m a bit new to such kinds of
> > > >>>>>>>>>>> conversations
> > > >>>>>>>>>>>>> :)
> > > >>>>>>>>>>>>>>>>>>>>> I have no write access to the confluence, so I
> > made a
> > > >>>>>>>>>>> Jira
> > > >>>>>>>>>>>>> issue,
> > > >>>>>>>>>>>>>>>>>>>>> where described the proposed changes in more
> > details
> > > -
> > > >>>>>>>>>>>>>>>>>>>>>
> https://issues.apache.org/jira/browse/FLINK-27411.
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Will happy to get more feedback!
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> Best,
> > > >>>>>>>>>>>>>>>>>>>>> Smirnov Alexander
> > > >>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>> пн, 25 апр. 2022 г. в 19:49, Arvid Heise <
> > > >>>>>>>>>>> ar...@apache.org>:
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng,
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Thanks for driving this; the inconsistency was
> not
> > > >>>>>>>>>>>>> satisfying
> > > >>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>> me.
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> I second Alexander's idea though but could also
> > live
> > > >>>>>>>>>>> with
> > > >>>>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>> easier
> > > >>>>>>>>>>>>>>>>>>>>>> solution as the first step: Instead of making
> > > caching
> > > >>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>>>> implementation
> > > >>>>>>>>>>>>>>>>>>>>>> detail of TableFunction X, rather devise a
> caching
> > > >>>>>>>>>>> layer
> > > >>>>>>>>>>>>>>>> around X.
> > > >>>>>>>>>>>>>>>>>> So
> > > >>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>> proposal would be a CachingTableFunction that
> > > >>>>>>>>>>> delegates to
> > > >>>>>>>>>>>>> X in
> > > >>>>>>>>>>>>>>>>> case
> > > >>>>>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>>> misses and else manages the cache. Lifting it
> into
> > > the
> > > >>>>>>>>>>>>> operator
> > > >>>>>>>>>>>>>>>>>> model
> > > >>>>>>>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>>>>>>> proposed would be even better but is probably
> > > >>>>>>>>>>> unnecessary
> > > >>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>> first step
> > > >>>>>>>>>>>>>>>>>>>>>> for a lookup source (as the source will only
> > receive
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> requests
> > > >>>>>>>>>>>>>>>>>>> after
> > > >>>>>>>>>>>>>>>>>>>>>> filter; applying projection may be more
> > interesting
> > > to
> > > >>>>>>>>>>> save
> > > >>>>>>>>>>>>>>>>> memory).
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> Another advantage is that all the changes of
> this
> > > FLIP
> > > >>>>>>>>>>>>> would be
> > > >>>>>>>>>>>>>>>>>>> limited to
> > > >>>>>>>>>>>>>>>>>>>>>> options, no need for new public interfaces.
> > > Everything
> > > >>>>>>>>>>> else
> > > >>>>>>>>>>>>>>>>> remains
> > > >>>>>>>>>>>>>>>>>> an
> > > >>>>>>>>>>>>>>>>>>>>>> implementation of Table runtime. That means we
> can
> > > >>>>>>>>>>> easily
> > > >>>>>>>>>>>>>>>>>> incorporate
> > > >>>>>>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>>>>>> optimization potential that Alexander pointed
> out
> > > >>>>>>>>>>> later.
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> @Alexander unfortunately, your architecture is
> not
> > > >>>>>>>>>>> shared.
> > > >>>>>>>>>>>>> I
> > > >>>>>>>>>>>>>>>> don't
> > > >>>>>>>>>>>>>>>>>>> know the
> > > >>>>>>>>>>>>>>>>>>>>>> solution to share images to be honest.
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 22, 2022 at 5:04 PM Александр
> Смирнов
> > <
> > > >>>>>>>>>>>>>>>>>>> smirale...@gmail.com>
> > > >>>>>>>>>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Qingsheng! My name is Alexander, I'm not a
> > > >>>>>>>>>>> committer
> > > >>>>>>>>>>>>> yet,
> > > >>>>>>>>>>>>>>>> but
> > > >>>>>>>>>>>>>>>>>> I'd
> > > >>>>>>>>>>>>>>>>>>>>>>> really like to become one. And this FLIP really
> > > >>>>>>>>>>>>> interested
> > > >>>>>>>>>>>>>>>> me.
> > > >>>>>>>>>>>>>>>>>>>>>>> Actually I have worked on a similar feature in
> my
> > > >>>>>>>>>>>>> company’s
> > > >>>>>>>>>>>>>>>>> Flink
> > > >>>>>>>>>>>>>>>>>>>>>>> fork, and we would like to share our thoughts
> on
> > > >>>>>>>>>>> this and
> > > >>>>>>>>>>>>>>>> make
> > > >>>>>>>>>>>>>>>>>> code
> > > >>>>>>>>>>>>>>>>>>>>>>> open source.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> I think there is a better alternative than
> > > >>>>>>>>>>> introducing an
> > > >>>>>>>>>>>>>>>>> abstract
> > > >>>>>>>>>>>>>>>>>>>>>>> class for TableFunction (CachingTableFunction).
> > As
> > > >>>>>>>>>>> you
> > > >>>>>>>>>>>>> know,
> > > >>>>>>>>>>>>>>>>>>>>>>> TableFunction exists in the flink-table-common
> > > >>>>>>>>>>> module,
> > > >>>>>>>>>>>>> which
> > > >>>>>>>>>>>>>>>>>>> provides
> > > >>>>>>>>>>>>>>>>>>>>>>> only an API for working with tables – it’s very
> > > >>>>>>>>>>>>> convenient
> > > >>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>> importing
> > > >>>>>>>>>>>>>>>>>>>>>>> in connectors. In turn, CachingTableFunction
> > > contains
> > > >>>>>>>>>>>>> logic
> > > >>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>>>>>> runtime execution,  so this class and
> everything
> > > >>>>>>>>>>>>> connected
> > > >>>>>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>>>>>>>>> should be located in another module, probably
> in
> > > >>>>>>>>>>>>>>>>>>> flink-table-runtime.
> > > >>>>>>>>>>>>>>>>>>>>>>> But this will require connectors to depend on
> > > another
> > > >>>>>>>>>>>>> module,
> > > >>>>>>>>>>>>>>>>>> which
> > > >>>>>>>>>>>>>>>>>>>>>>> contains a lot of runtime logic, which doesn’t
> > > sound
> > > >>>>>>>>>>>>> good.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> I suggest adding a new method ‘getLookupConfig’
> > to
> > > >>>>>>>>>>>>>>>>>> LookupTableSource
> > > >>>>>>>>>>>>>>>>>>>>>>> or LookupRuntimeProvider to allow connectors to
> > > only
> > > >>>>>>>>>>> pass
> > > >>>>>>>>>>>>>>>>>>>>>>> configurations to the planner, therefore they
> > won’t
> > > >>>>>>>>>>>>> depend on
> > > >>>>>>>>>>>>>>>>>>> runtime
> > > >>>>>>>>>>>>>>>>>>>>>>> realization. Based on these configs planner
> will
> > > >>>>>>>>>>>>> construct a
> > > >>>>>>>>>>>>>>>>>> lookup
> > > >>>>>>>>>>>>>>>>>>>>>>> join operator with corresponding runtime logic
> > > >>>>>>>>>>>>>>>> (ProcessFunctions
> > > >>>>>>>>>>>>>>>>>> in
> > > >>>>>>>>>>>>>>>>>>>>>>> module flink-table-runtime). Architecture looks
> > > like
> > > >>>>>>>>>>> in
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>> pinned
> > > >>>>>>>>>>>>>>>>>>>>>>> image (LookupConfig class there is actually
> yours
> > > >>>>>>>>>>>>>>>> CacheConfig).
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> Classes in flink-table-planner, that will be
> > > >>>>>>>>>>> responsible
> > > >>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>>> –
> > > >>>>>>>>>>>>>>>>>>>>>>> CommonPhysicalLookupJoin and his inheritors.
> > > >>>>>>>>>>>>>>>>>>>>>>> Current classes for lookup join in
> > > >>>>>>>>>>> flink-table-runtime
> > > >>>>>>>>>>>>> -
> > > >>>>>>>>>>>>>>>>>>>>>>> LookupJoinRunner, AsyncLookupJoinRunner,
> > > >>>>>>>>>>>>>>>>> LookupJoinRunnerWithCalc,
> > > >>>>>>>>>>>>>>>>>>>>>>> AsyncLookupJoinRunnerWithCalc.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> I suggest adding classes
> LookupJoinCachingRunner,
> > > >>>>>>>>>>>>>>>>>>>>>>> LookupJoinCachingRunnerWithCalc, etc.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> And here comes another more powerful advantage
> of
> > > >>>>>>>>>>> such a
> > > >>>>>>>>>>>>>>>>> solution.
> > > >>>>>>>>>>>>>>>>>>> If
> > > >>>>>>>>>>>>>>>>>>>>>>> we have caching logic on a lower level, we can
> > > apply
> > > >>>>>>>>>>> some
> > > >>>>>>>>>>>>>>>>>>>>>>> optimizations to it. LookupJoinRunnerWithCalc
> was
> > > >>>>>>>>>>> named
> > > >>>>>>>>>>>>> like
> > > >>>>>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>>>>>>>> because it uses the ‘calc’ function, which
> > actually
> > > >>>>>>>>>>>>> mostly
> > > >>>>>>>>>>>>>>>>>> consists
> > > >>>>>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>>>> filters and projections.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> For example, in join table A with lookup table
> B
> > > >>>>>>>>>>>>> condition
> > > >>>>>>>>>>>>>>>>> ‘JOIN …
> > > >>>>>>>>>>>>>>>>>>> ON
> > > >>>>>>>>>>>>>>>>>>>>>>> A.id = B.id AND A.age = B.age + 10 WHERE
> > B.salary >
> > > >>>>>>>>>>> 1000’
> > > >>>>>>>>>>>>>>>>> ‘calc’
> > > >>>>>>>>>>>>>>>>>>>>>>> function will contain filters A.age = B.age +
> 10
> > > and
> > > >>>>>>>>>>>>>>>> B.salary >
> > > >>>>>>>>>>>>>>>>>>> 1000.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> If we apply this function before storing
> records
> > in
> > > >>>>>>>>>>>>> cache,
> > > >>>>>>>>>>>>>>>> size
> > > >>>>>>>>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>>>>>>>>> cache will be significantly reduced: filters =
> > > avoid
> > > >>>>>>>>>>>>> storing
> > > >>>>>>>>>>>>>>>>>> useless
> > > >>>>>>>>>>>>>>>>>>>>>>> records in cache, projections = reduce records’
> > > >>>>>>>>>>> size. So
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> initial
> > > >>>>>>>>>>>>>>>>>>>>>>> max number of records in cache can be increased
> > by
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>> user.
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> What do you think about it?
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>> On 2022/04/19 02:47:11 Qingsheng Ren wrote:
> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi devs,
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Yuan and I would like to start a discussion
> > about
> > > >>>>>>>>>>>>>>>> FLIP-221[1],
> > > >>>>>>>>>>>>>>>>>>> which
> > > >>>>>>>>>>>>>>>>>>>>>>> introduces an abstraction of lookup table cache
> > and
> > > >>>>>>>>>>> its
> > > >>>>>>>>>>>>>>>> standard
> > > >>>>>>>>>>>>>>>>>>> metrics.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Currently each lookup table source should
> > > implement
> > > >>>>>>>>>>>>> their
> > > >>>>>>>>>>>>>>>> own
> > > >>>>>>>>>>>>>>>>>>> cache to
> > > >>>>>>>>>>>>>>>>>>>>>>> store lookup results, and there isn’t a
> standard
> > of
> > > >>>>>>>>>>>>> metrics
> > > >>>>>>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>>>>> users and
> > > >>>>>>>>>>>>>>>>>>>>>>> developers to tuning their jobs with lookup
> > joins,
> > > >>>>>>>>>>> which
> > > >>>>>>>>>>>>> is a
> > > >>>>>>>>>>>>>>>>>> quite
> > > >>>>>>>>>>>>>>>>>>> common
> > > >>>>>>>>>>>>>>>>>>>>>>> use case in Flink table / SQL.
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Therefore we propose some new APIs including
> > > cache,
> > > >>>>>>>>>>>>>>>> metrics,
> > > >>>>>>>>>>>>>>>>>>> wrapper
> > > >>>>>>>>>>>>>>>>>>>>>>> classes of TableFunction and new table options.
> > > >>>>>>>>>>> Please
> > > >>>>>>>>>>>>> take a
> > > >>>>>>>>>>>>>>>>> look
> > > >>>>>>>>>>>>>>>>>>> at the
> > > >>>>>>>>>>>>>>>>>>>>>>> FLIP page [1] to get more details. Any
> > suggestions
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>> comments
> > > >>>>>>>>>>>>>>>>>>> would be
> > > >>>>>>>>>>>>>>>>>>>>>>> appreciated!
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> [1]
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-221+Abstraction+for+lookup+source+cache+and+metric
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>> Qingsheng
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>>>>>>>> Best Regards,
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Qingsheng Ren
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Real-time Computing Team
> > > >>>>>>>>>>>>>>>>>>>> Alibaba Cloud
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Email: renqs...@gmail.com
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>>> Best regards,
> > > >>>>>>>>>>>>>>> Roman Boyko
> > > >>>>>>>>>>>>>>> e.: ro.v.bo...@gmail.com
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> --
> > > >>>>>>>> Best Regards,
> > > >>>>>>>>
> > > >>>>>>>> Qingsheng Ren
> > > >>>>>>>>
> > > >>>>>>>> Real-time Computing Team
> > > >>>>>>>> Alibaba Cloud
> > > >>>>>>>>
> > > >>>>>>>> Email: renqs...@gmail.com
> > > >>>>>>
> > >
> > >
> >
>

Reply via email to