Just to clarify, when I say foo() like below, I assume that foo() must have
a way to release its own cache, so it must have access to table env.

void foo(Table t) {
  ...
  t.cache(); // create cache for t
  ...
  env.getCacheService().releaseCacheFor(t); // release cache for t
}

Thanks,

Jiangjie (Becket) Qin

On Tue, Jan 8, 2019 at 9:04 PM Becket Qin <becket....@gmail.com> wrote:

> Hi Piotr,
>
> I don't think it is feasible to ask every third party library to have
> method signature with CacheService as an argument.
>
> And even that signature does not really solve the problem. Imagine
> function foo() looks like following:
>
> void foo(Table t) {
>   ...
>   t.cache(); // create cache for t
>   ...
>   env.getCacheService().releaseCacheFor(t); // release cache for t
> }
>
> From function foo()'s perspective, it created a cache and released it.
> However, if someone invokes foo like this:
> {
>   Table src = ...
>   Table t = src.select(...).cache()
>   foo(t)
>   // t is uncached by foo() already.
> }
>
> So the "side effect" still exists.
>
> I think the only safe way to ensure there is no side effect while sharing
> the cache is to use ref count.
>
> BTW, the discussion we are having here is exactly the reason that I prefer
> option 3. From technical perspective option 3 solves all the concerns.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Tue, Jan 8, 2019 at 8:41 PM Piotr Nowojski <pi...@da-platform.com>
> wrote:
>
>> Hi,
>>
>> I think that introducing ref counting could be confusing and it will be
>> error prone, since Flink-table’s users are not used to closing/releasing
>> resources. I was more objecting placing the
>> `uncache()`/`dropCache()`/`releaseCache()` (releaseCache sounds best to me)
>> as a method in the “Table”. It might be not obvious that it will drop the
>> cache for all of the usages of the given table. For example:
>>
>> public void foo(Table t) {
>>  // …
>>  t.releaseCache();
>> }
>>
>> public void bar(Table t) {
>>   // ...
>> }
>>
>> Table a = …
>> val cachedA = a.cache()
>>
>> foo(cachedA)
>> bar(cachedA)
>>
>>
>> My problem with above example is that `t.releaseCache()` call is not
>> doing the best possible job in communicating to the user that it will have
>> a side effects for other places, like `bar(cachedA)` call. Something like
>> this might be a better (not perfect, but just a bit better):
>>
>> public void foo(Table t, CacheService cacheService) {
>>  // …
>>  cacheService.releaseCacheFor(t);
>> }
>>
>> Table a = …
>> val cachedA = a.cache()
>>
>> foo(cachedA, env.getCacheService())
>> bar(cachedA)
>>
>>
>> Also from another perspective, maybe placing `releaseCache()` method in
>> Table might not be the best separation of concerns - `releaseCache()`
>> method seams significantly different compared to other existing methods.
>>
>> Piotrek
>>
>> > On 8 Jan 2019, at 12:28, Becket Qin <becket....@gmail.com> wrote:
>> >
>> > Hi Piotr,
>> >
>> > You are right. There might be two intuitive meanings when users call
>> > 'a.uncache()', namely:
>> > 1. release the resource
>> > 2. Do not use cache for the next operation.
>> >
>> > Case (1) would likely be the dominant use case. So I would suggest we
>> > dedicate uncache() method to case (1), i.e. for resource release, but
>> not
>> > for ignoring cache.
>> >
>> > For case 2, i.e. explicitly ignoring cache (which is rare), users may
>> use
>> > something like 'hint("ignoreCache")'. I think this is better as it is a
>> > little weird for users to call `a.uncache()` while they may not even
>> know
>> > if the table is cached at all.
>> >
>> > Assuming we let `uncache()` to only release resource, one possibility is
>> > using ref count to mitigate the side effect. That means a ref count is
>> > incremented on `cache()` and decremented on `uncache()`. That means
>> > `uncache()` does not physically release the resource immediately, but
>> just
>> > means the cache could be released.
>> > That being said, I am not sure if this is really a better solution as it
>> > seems a little counter intuitive. Maybe calling it releaseCache() help a
>> > little bit?
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> >
>> >
>> >
>> > On Tue, Jan 8, 2019 at 5:36 PM Piotr Nowojski <pi...@da-platform.com>
>> wrote:
>> >
>> >> Hi Becket,
>> >>
>> >> With `uncache` there are probably two features that we can think about:
>> >>
>> >> a)
>> >>
>> >> Physically dropping the cached table from the storage, freeing up the
>> >> resources
>> >>
>> >> b)
>> >>
>> >> Hinting the optimizer to not cache the reads for the next query/table
>> >>
>> >> a) Has the issue as I wrote before, that it seemed to be an operation
>> >> inherently “flawed" with having side effects.
>> >>
>> >> I’m not sure how it would be best to express. We could make it work:
>> >>
>> >> 1. via a method on a Table as you proposed:
>> >>
>> >> void Table#dropCache()
>> >> void Table#uncache()
>> >>
>> >> 2. Operation on the environment
>> >>
>> >> env.dropCacheFor(table) // or some other argument that allows user to
>> >> identify the desired cache
>> >>
>> >> 3. Extending (from your original design doc) `setTableService` method
>> to
>> >> return some control handle like:
>> >>
>> >> TableServiceControl setTableService(TableFactory tf,
>> >>                     TableProperties properties,
>> >>                     TempTableCleanUpCallback cleanUpCallback);
>> >>
>> >> (TableServiceControl? TableService? TableServiceHandle? CacheService?)
>> >>
>> >> And having the drop cache method there:
>> >>
>> >> TableServiceControl#dropCache(table)
>> >>
>> >> Out of those options, option 1 might have a disadvantage of kind of not
>> >> making the user aware, that this is a global operation with side
>> effects.
>> >> Like the old example of:
>> >>
>> >> public void foo(Table t) {
>> >>  // …
>> >>  t.dropCache();
>> >> }
>> >>
>> >> It might not be immediately obvious that `t.dropCache()` is some kind
>> of
>> >> global operation, with side effects visible outside of the `foo`
>> function.
>> >>
>> >> On the other hand, both option 2 and 3, might have greater chance of
>> >> catching user’s attention:
>> >>
>> >> public void foo(Table t, CacheService cacheService) {
>> >>  // …
>> >>  cacheService.dropCache(t);
>> >> }
>> >>
>> >> b) could be achieved quite easily:
>> >>
>> >> Table a = …
>> >> val notCached1 = a.doNotCache()
>> >> val cachedA = a.cache()
>> >> val notCached2 = cachedA.doNotCache() // equivalent of notCached1
>> >>
>> >> `doNotCache()` would behave similarly to `cache()` - return a copy of
>> the
>> >> table with removed “cache” hint and/or added “never cache” hint.
>> >>
>> >> Piotrek
>> >>
>> >>
>> >>> On 8 Jan 2019, at 03:17, Becket Qin <becket....@gmail.com> wrote:
>> >>>
>> >>> Hi Piotr,
>> >>>
>> >>> Thanks for the proposal and detailed explanation. I like the idea of
>> >>> returning a new hinted Table without modifying the original table.
>> This
>> >>> also leave the room for users to benefit from future implicit caching.
>> >>>
>> >>> Just to make sure I get the full picture. In your proposal, there will
>> >> also
>> >>> be a 'void Table#uncache()' method to release the cache, right?
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Jiangjie (Becket) Qin
>> >>>
>> >>> On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski <pi...@da-platform.com
>> >
>> >>> wrote:
>> >>>
>> >>>> Hi Becket!
>> >>>>
>> >>>> After further thinking I tend to agree that my previous proposal
>> >> (*Option
>> >>>> 2*) indeed might not be if would in the future introduce automatic
>> >> caching.
>> >>>> However I would like to propose a slightly modified version of it:
>> >>>>
>> >>>> *Option 4*
>> >>>>
>> >>>> Adding `cache()` method with following signature:
>> >>>>
>> >>>> Table Table#cache();
>> >>>>
>> >>>> Without side-effects, and `cache()` call do not modify/change
>> original
>> >>>> Table in any way.
>> >>>> It would return a copy of original table, with added hint for the
>> >>>> optimizer to cache the table, so that the future accesses to the
>> >> returned
>> >>>> table might be cached or not.
>> >>>>
>> >>>> Assuming that we are talking about a setup, where we do not have
>> >> automatic
>> >>>> caching enabled (possible future extension).
>> >>>>
>> >>>> Example #1:
>> >>>>
>> >>>> ```
>> >>>> Table a = …
>> >>>> a.foo() // not cached
>> >>>>
>> >>>> val cachedTable = a.cache();
>> >>>>
>> >>>> cachedA.bar() // maybe cached
>> >>>> a.foo() // same as before - effectively not cached
>> >>>> ```
>> >>>>
>> >>>> Both the first and the second `a.foo()` operations would behave in
>> the
>> >>>> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself.
>> If
>> >> `a`
>> >>>> was not hinted for caching before `a.cache();`, then both `a.foo()`
>> >> calls
>> >>>> wouldn’t use cache.
>> >>>>
>> >>>> Returned `cachedA` would be hinted with “cache” hint, so probably
>> >>>> `cachedA.bar()` would go through cache (unless optimiser decides the
>> >>>> opposite)
>> >>>>
>> >>>> Example #2
>> >>>>
>> >>>> ```
>> >>>> Table a = …
>> >>>>
>> >>>> a.foo() // not cached
>> >>>>
>> >>>> val b = a.cache();
>> >>>>
>> >>>> a.foo() // same as before - effectively not cached
>> >>>> b.foo() // maybe cached
>> >>>>
>> >>>> val c = b.cache();
>> >>>>
>> >>>> a.foo() // same as before - effectively not cached
>> >>>> b.foo() // same as before - effectively maybe cached
>> >>>> c.foo() // maybe cached
>> >>>> ```
>> >>>>
>> >>>> Now, assuming that we have some future “automatic caching
>> optimisation”:
>> >>>>
>> >>>> Example #3
>> >>>>
>> >>>> ```
>> >>>> env.enableAutomaticCaching()
>> >>>> Table a = …
>> >>>>
>> >>>> a.foo() // might be cached, depending if `a` was selected to
>> automatic
>> >>>> caching
>> >>>>
>> >>>> val b = a.cache();
>> >>>>
>> >>>> a.foo() // same as before - might be cached, if `a` was selected to
>> >>>> automatic caching
>> >>>> b.foo() // maybe cached
>> >>>> ```
>> >>>>
>> >>>>
>> >>>> More or less this is the same behaviour as:
>> >>>>
>> >>>> Table a = ...
>> >>>> val b = a.filter(x > 20)
>> >>>>
>> >>>> calling `filter` hasn’t changed or altered `a` in anyway. If `a` was
>> >>>> previously filtered:
>> >>>>
>> >>>> Table src = …
>> >>>> val a = src.filter(x > 20)
>> >>>> val b = a.filter(x > 20)
>> >>>>
>> >>>> then yes, `a` and `b` will be the same. But the point is that neither
>> >>>> `filter` nor `cache` changes the original `a` table.
>> >>>>
>> >>>> One thing is that indeed, physically dropping cache operation, will
>> have
>> >>>> side effects and it will in a way mutate the cached table references.
>> >> But
>> >>>> this is I think unavoidable in any solution - the same issue as
>> calling
>> >>>> `.close()`, or calling destructor in C++.
>> >>>>
>> >>>> Piotrek
>> >>>>
>> >>>>> On 7 Jan 2019, at 10:41, Becket Qin <becket....@gmail.com> wrote:
>> >>>>>
>> >>>>> Happy New Year, everybody!
>> >>>>>
>> >>>>> I would like to resume this discussion thread. At this point, We
>> have
>> >>>>> agreed on the first step goal of interactive programming. The open
>> >>>>> discussion is the exact API. More specifically, what should
>> *cache()*
>> >>>>> method return and what is the semantic. There are three options:
>> >>>>>
>> >>>>> *Option 1*
>> >>>>> *void cache()* OR *Table cache()* which returns the original table
>> for
>> >>>>> chained calls.
>> >>>>> *void uncache() *releases the cache.
>> >>>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo().
>> >>>>>
>> >>>>> - Semantic: a.cache() hints that table 'a' should be cached.
>> Optimizer
>> >>>>> decides whether the cache will be used or not.
>> >>>>> - pros: simple and no confusion between CachedTable and original
>> table
>> >>>>> - cons: A table may be cached / uncached in a method invocation,
>> while
>> >>>> the
>> >>>>> caller does not know about this.
>> >>>>>
>> >>>>> *Option 2*
>> >>>>> *CachedTable cache()*
>> >>>>> *CachedTable *extends *Table *with an additional *uncache()* method
>> >>>>>
>> >>>>> - Semantic: After *val cachedA = a.cache()*, *cachedA.foo()* will
>> >> always
>> >>>>> use cache. *a.bar() *will always use original DAG.
>> >>>>> - pros: No potential side effects in method invocation.
>> >>>>> - cons: Optimizer has no chance to kick in. Future optimization will
>> >>>> become
>> >>>>> a behavior change and need users to change the code.
>> >>>>>
>> >>>>> *Option 3*
>> >>>>> *CacheHandle cache()*
>> >>>>> *CacheHandle.release() *to release a cache handle on the table. If
>> all
>> >>>>> cache handles are released, the cache could be removed.
>> >>>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo().
>> >>>>>
>> >>>>> - Semantic: *a.cache() *hints that 'a' should be cached. Optimizer
>> >>>> decides
>> >>>>> whether the cache will be used or not. Cache is released either no
>> >> handle
>> >>>>> is on it, or the user program exits.
>> >>>>> - pros: No potential side effect in method invocation. No confusion
>> >>>> between
>> >>>>> cached table v.s original table.
>> >>>>> - cons: An additional CacheHandle exposed to the users.
>> >>>>>
>> >>>>>
>> >>>>> Personally I prefer option 3 for the following reasons:
>> >>>>> 1. It is simple. Vast majority of the users would just call
>> >>>>> *a.cache()* followed
>> >>>>> by *a.foo(),* *a.bar(), etc. *
>> >>>>> 2. There is no semantic ambiguity and semantic change if we decide
>> to
>> >> add
>> >>>>> implicit cache in the future.
>> >>>>> 3. There is no side effect in the method calls.
>> >>>>> 4. Admittedly we need to expose one more CacheHandle class to the
>> >> users.
>> >>>>> But it is not that difficult to understand given similar well known
>> >>>> concept
>> >>>>> like ref count (we can name it CacheReference if that is easier to
>> >>>>> understand). So I think it is fine.
>> >>>>>
>> >>>>>
>> >>>>> Thanks,
>> >>>>>
>> >>>>> Jiangjie (Becket) Qin
>> >>>>>
>> >>>>>
>> >>>>> On Thu, Dec 13, 2018 at 11:23 AM Becket Qin <becket....@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>>> Hi Piotrek,
>> >>>>>>
>> >>>>>> 1. Regarding optimization.
>> >>>>>> Sure there are many cases that the decision is hard to make. But
>> that
>> >>>> does
>> >>>>>> not make it any easier for the users to make those decisions. I
>> >> imagine
>> >>>> 99%
>> >>>>>> of the users would just naively use cache. I am not saying we can
>> >>>> optimize
>> >>>>>> in all the cases. But as long as we agree that at least in certain
>> >>>> cases (I
>> >>>>>> would argue most cases), optimizer can do a little better than an
>> >>>> average
>> >>>>>> user who likely knows little about Flink internals, we should not
>> push
>> >>>> the
>> >>>>>> burden of optimization to users.
>> >>>>>>
>> >>>>>> BTW, it seems some of your concerns are related to the
>> >> implementation. I
>> >>>>>> did not mention the implementation of the caching service because
>> that
>> >>>>>> should not affect the API semantic. Not sure if this helps, but
>> >> imagine
>> >>>> the
>> >>>>>> default implementation has one StorageNode service colocating with
>> >> each
>> >>>> TM.
>> >>>>>> It could be running within the TM process or in a standalone
>> process,
>> >>>>>> depending on configuration.
>> >>>>>>
>> >>>>>> The StorageNode uses memory + spill-to-disk mechanism. The cached
>> data
>> >>>>>> will just be written to the local StorageNode service. If the
>> >>>> StorageNode
>> >>>>>> is running within the TM process, the in-memory cache could just be
>> >>>> objects
>> >>>>>> so we save some serde cost. A later job referring to the cached
>> Table
>> >>>> will
>> >>>>>> be scheduled in a locality aware manner, i.e. run in the TM whose
>> peer
>> >>>>>> StorageNode hosts the data.
>> >>>>>>
>> >>>>>>
>> >>>>>> 2. Semantic
>> >>>>>> I am not sure why introducing a new hintCache() or
>> >>>>>> env.enableAutomaticCaching() method would avoid the consequence of
>> >>>> semantic
>> >>>>>> change.
>> >>>>>>
>> >>>>>> If the auto optimization is not enabled by default, users still
>> need
>> >> to
>> >>>>>> make code change to all existing programs in order to get the
>> benefit.
>> >>>>>> If the auto optimization is enabled by default, advanced users who
>> >> know
>> >>>>>> that they really want to use cache will suddenly lose the
>> opportunity
>> >>>> to do
>> >>>>>> so, unless they change the code to disable auto optimization.
>> >>>>>>
>> >>>>>>
>> >>>>>> 3. side effect
>> >>>>>> The CacheHandle is not only for where to put uncache(). It is to
>> solve
>> >>>> the
>> >>>>>> implicit performance impact by moving the uncache() to the
>> >> CacheHandle.
>> >>>>>>
>> >>>>>> - If users wants to leverage cache, they can call a.cache(). After
>> >>>>>> that, unless user explicitly release that CacheHandle, a.foo() will
>> >>>> always
>> >>>>>> leverage cache if needed (optimizer may choose to ignore cache if
>> >> that
>> >>>>>> helps accelerate the process). Any function call will not be able
>> to
>> >>>>>> release the cache because they do not have that CacheHandle.
>> >>>>>> - If some advanced users do not want to use cache at all, they will
>> >>>>>> call a.hint(ignoreCache).foo(). This will for sure ignore cache and
>> >>>> use the
>> >>>>>> original DAG to process.
>> >>>>>>
>> >>>>>>
>> >>>>>>> In vast majority of the cases, users wouldn't really care whether
>> the
>> >>>>>>> cache is used or not.
>> >>>>>>> I wouldn’t agree with that, because “caching” (if not purely in
>> >> memory
>> >>>>>>> caching) would add additional IO costs. It’s similar as saying
>> that
>> >>>> users
>> >>>>>>> would not see a difference between Spark/Flink and MapReduce
>> >> (MapReduce
>> >>>>>>> writes data to disks after every map/reduce stage).
>> >>>>>>
>> >>>>>> What I wanted to say is that in most cases, after users call
>> cache(),
>> >>>> they
>> >>>>>> don't really care about whether auto optimization has decided to
>> >> ignore
>> >>>> the
>> >>>>>> cache or not, as long as the program runs faster.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>>
>> >>>>>> Jiangjie (Becket) Qin
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On Wed, Dec 12, 2018 at 10:50 PM Piotr Nowojski <
>> >>>> pi...@data-artisans.com>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi,
>> >>>>>>>
>> >>>>>>> Thanks for the quick answer :)
>> >>>>>>>
>> >>>>>>> Re 1.
>> >>>>>>>
>> >>>>>>> I generally agree with you, however couple of points:
>> >>>>>>>
>> >>>>>>> a) the problem with using automatic caching is bigger, because you
>> >> will
>> >>>>>>> have to decide, how do you compare IO vs CPU costs and if you pick
>> >>>> wrong,
>> >>>>>>> additional IO costs might be enormous or even can crash your
>> system.
>> >>>> This
>> >>>>>>> is more difficult problem compared to let say join reordering,
>> where
>> >>>> the
>> >>>>>>> only issue is to have good statistics that can capture
>> correlations
>> >>>> between
>> >>>>>>> columns (when you reorder joins number of IO operations do not
>> >> change)
>> >>>>>>> c) your example is completely independent of caching.
>> >>>>>>>
>> >>>>>>> Query like this:
>> >>>>>>>
>> >>>>>>> src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1
>> ===`f2).as('f3,
>> >>>>>>> …).filter(‘f3 > 30)
>> >>>>>>>
>> >>>>>>> Should/could be optimised to empty result immediately, without the
>> >> need
>> >>>>>>> for any cache/materialisation and that should work even without
>> any
>> >>>>>>> statistics provided by the connector.
>> >>>>>>>
>> >>>>>>> For me prerequisite to any serious cost-based optimisations would
>> be
>> >>>> some
>> >>>>>>> reasonable benchmark coverage of the code (tpch?). Otherwise that
>> >>>> would be
>> >>>>>>> equivalent of adding not tested code, since we wouldn’t be able to
>> >>>> verify
>> >>>>>>> our assumptions, like how does the writing of 10 000 records to
>> >>>>>>> cache/RocksDB/Kafka/CSV file compare to
>> joining/filtering/processing
>> >> of
>> >>>>>>> lets say 1000 000 rows.
>> >>>>>>>
>> >>>>>>> Re 2.
>> >>>>>>>
>> >>>>>>> I wasn’t proposing to change the semantic later. I was proposing
>> that
>> >>>> we
>> >>>>>>> start now:
>> >>>>>>>
>> >>>>>>> CachedTable cachedA = a.cache()
>> >>>>>>> cachedA.foo() // Cache is used
>> >>>>>>> a.bar() // Original DAG is used
>> >>>>>>>
>> >>>>>>> And then later we can think about adding for example
>> >>>>>>>
>> >>>>>>> CachedTable cachedA = a.hintCache()
>> >>>>>>> cachedA.foo() // Cache might be used
>> >>>>>>> a.bar() // Original DAG is used
>> >>>>>>>
>> >>>>>>> Or
>> >>>>>>>
>> >>>>>>> env.enableAutomaticCaching()
>> >>>>>>> a.foo() // Cache might be used
>> >>>>>>> a.bar() // Cache might be used
>> >>>>>>>
>> >>>>>>> Or (I would still not like this option):
>> >>>>>>>
>> >>>>>>> a.hintCache()
>> >>>>>>> a.foo() // Cache might be used
>> >>>>>>> a.bar() // Cache might be used
>> >>>>>>>
>> >>>>>>> Or whatever else that will come to our mind. Even if we add some
>> >>>>>>> automatic caching in the future, keeping implicit (`CachedTable
>> >>>> cache()`)
>> >>>>>>> caching will still be useful, at least in some cases.
>> >>>>>>>
>> >>>>>>> Re 3.
>> >>>>>>>
>> >>>>>>>> 2. The source tables are immutable during one run of batch
>> >> processing
>> >>>>>>> logic.
>> >>>>>>>> 3. The cache is immutable during one run of batch processing
>> logic.
>> >>>>>>>
>> >>>>>>>> I think assumption 2 and 3 are by definition what batch
>> processing
>> >>>>>>> means,
>> >>>>>>>> i.e the data must be complete before it is processed and should
>> not
>> >>>>>>> change
>> >>>>>>>> when the processing is running.
>> >>>>>>>
>> >>>>>>> I agree that this is how batch systems SHOULD be working. However
>> I
>> >>>> know
>> >>>>>>> from my previous experience that it’s not always the case.
>> Sometimes
>> >>>> users
>> >>>>>>> are just working on some non transactional storage, which can be
>> >>>> (either
>> >>>>>>> constantly or occasionally) being modified by some other processes
>> >> for
>> >>>>>>> whatever the reasons (fixing the data, updating, adding new data
>> >> etc).
>> >>>>>>>
>> >>>>>>> But even if we ignore this point (data immutability), performance
>> >> side
>> >>>>>>> effect issue of your proposal remains. If user calls `void
>> a.cache()`
>> >>>> deep
>> >>>>>>> inside some private method, it will have implicit side effects on
>> >> other
>> >>>>>>> parts of his program that might not be obvious.
>> >>>>>>>
>> >>>>>>> Re `CacheHandle`.
>> >>>>>>>
>> >>>>>>> If I understand it correctly, it only addresses the issue where to
>> >>>> place
>> >>>>>>> method `uncache`/`dropCache`.
>> >>>>>>>
>> >>>>>>> Btw,
>> >>>>>>>
>> >>>>>>>> In vast majority of the cases, users wouldn't really care whether
>> >> the
>> >>>>>>> cache is used or not.
>> >>>>>>>
>> >>>>>>> I wouldn’t agree with that, because “caching” (if not purely in
>> >> memory
>> >>>>>>> caching) would add additional IO costs. It’s similar as saying
>> that
>> >>>> users
>> >>>>>>> would not see a difference between Spark/Flink and MapReduce
>> >> (MapReduce
>> >>>>>>> writes data to disks after every map/reduce stage).
>> >>>>>>>
>> >>>>>>> Piotrek
>> >>>>>>>
>> >>>>>>>> On 12 Dec 2018, at 14:28, Becket Qin <becket....@gmail.com>
>> wrote:
>> >>>>>>>>
>> >>>>>>>> Hi Piotrek,
>> >>>>>>>>
>> >>>>>>>> Not sure if you noticed, in my last email, I was proposing
>> >>>> `CacheHandle
>> >>>>>>>> cache()` to avoid the potential side effect due to function
>> calls.
>> >>>>>>>>
>> >>>>>>>> Let's look at the disagreement in your reply one by one.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> 1. Optimization chances
>> >>>>>>>>
>> >>>>>>>> Optimization is never a trivial work. This is exactly why we
>> should
>> >>>> not
>> >>>>>>> let
>> >>>>>>>> user manually do that. Databases have done huge amount of work in
>> >> this
>> >>>>>>>> area. At Alibaba, we rely heavily on many optimization rules to
>> >> boost
>> >>>>>>> the
>> >>>>>>>> SQL query performance.
>> >>>>>>>>
>> >>>>>>>> In your example, if I filling the filter conditions in a certain
>> >> way,
>> >>>>>>> the
>> >>>>>>>> optimization would become obvious.
>> >>>>>>>>
>> >>>>>>>> Table src1 = … // read from connector 1
>> >>>>>>>> Table src2 = … // read from connector 2
>> >>>>>>>>
>> >>>>>>>> Table a = src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1
>> ===
>> >>>>>>>> `f2).as('f3, ...)
>> >>>>>>>> a.cache() // write cache to connector 3, when writing the
>> records,
>> >>>>>>> remember
>> >>>>>>>> min and max of `f1
>> >>>>>>>>
>> >>>>>>>> a.filter('f3 > 30) // There is no need to read from any connector
>> >>>>>>> because
>> >>>>>>>> `a` does not contain any record whose 'f3 is greater than 30.
>> >>>>>>>> env.execute()
>> >>>>>>>> a.select(…)
>> >>>>>>>>
>> >>>>>>>> BTW, it seems to me that adding some basic statistics is fairly
>> >>>>>>>> straightforward and the cost is pretty marginal if not
>> ignorable. In
>> >>>>>>> fact
>> >>>>>>>> it is not only needed for optimization, but also for cases such
>> as
>> >> ML,
>> >>>>>>>> where some algorithms may need to decide their parameter based on
>> >> the
>> >>>>>>>> statistics of the data.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> 2. Same API, one semantic now, another semantic later.
>> >>>>>>>>
>> >>>>>>>> I am trying to understand what is the semantic of `CachedTable
>> >>>> cache()`
>> >>>>>>> you
>> >>>>>>>> are proposing. IMO, we should avoid designing an API whose
>> semantic
>> >>>>>>> will be
>> >>>>>>>> changed later. If we have a "CachedTable cache()" method, then
>> the
>> >>>>>>> semantic
>> >>>>>>>> should be very clearly defined upfront and do not change later.
>> It
>> >>>>>>> should
>> >>>>>>>> never be "right now let's go with semantic 1, later we can
>> silently
>> >>>>>>> change
>> >>>>>>>> it to semantic 2 or 3". Such change could result in bad
>> consequence.
>> >>>> For
>> >>>>>>>> example, let's say we decide go with semantic 1:
>> >>>>>>>>
>> >>>>>>>> CachedTable cachedA = a.cache()
>> >>>>>>>> cachedA.foo() // Cache is used
>> >>>>>>>> a.bar() // Original DAG is used.
>> >>>>>>>>
>> >>>>>>>> Now majority of the users would be using cachedA.foo() in their
>> >> code.
>> >>>>>>> And
>> >>>>>>>> some advanced users will use a.bar() to explicitly skip the
>> cache.
>> >>>> Later
>> >>>>>>>> on, we added smart optimization and change the semantic to
>> semantic
>> >> 2:
>> >>>>>>>>
>> >>>>>>>> CachedTable cachedA = a.cache()
>> >>>>>>>> cachedA.foo() // Cache is used
>> >>>>>>>> a.bar() // Cache MIGHT be used, and Flink may decide to skip
>> cache
>> >> if
>> >>>>>>> it is
>> >>>>>>>> faster.
>> >>>>>>>>
>> >>>>>>>> Now most of the users who were writing cachedA.foo() will not
>> >> benefit
>> >>>>>>> from
>> >>>>>>>> this optimization at all, unless they change their code to use
>> >> a.foo()
>> >>>>>>>> instead. And those advanced users suddenly lose the option to
>> >>>> explicitly
>> >>>>>>>> ignore cache unless they change their code (assuming we care
>> enough
>> >> to
>> >>>>>>>> provide something like hint(useCache)). If we don't define the
>> >>>> semantic
>> >>>>>>>> carefully, our users will have to change their code again and
>> again
>> >>>>>>> while
>> >>>>>>>> they shouldn't have to.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> 3. side effect.
>> >>>>>>>>
>> >>>>>>>> Before we talk about side effect, we have to agree on the
>> >> assumptions.
>> >>>>>>> The
>> >>>>>>>> assumptions I have are following:
>> >>>>>>>> 1. We are talking about batch processing.
>> >>>>>>>> 2. The source tables are immutable during one run of batch
>> >> processing
>> >>>>>>> logic.
>> >>>>>>>> 3. The cache is immutable during one run of batch processing
>> logic.
>> >>>>>>>>
>> >>>>>>>> I think assumption 2 and 3 are by definition what batch
>> processing
>> >>>>>>> means,
>> >>>>>>>> i.e the data must be complete before it is processed and should
>> not
>> >>>>>>> change
>> >>>>>>>> when the processing is running.
>> >>>>>>>>
>> >>>>>>>> As far as I am aware of, I don't know any batch processing system
>> >>>>>>> breaking
>> >>>>>>>> those assumptions. Even for relational database tables, where
>> >> queries
>> >>>>>>> can
>> >>>>>>>> run with concurrent modifications, necessary locking are still
>> >>>> required
>> >>>>>>> to
>> >>>>>>>> ensure the integrity of the query result.
>> >>>>>>>>
>> >>>>>>>> Please let me know if you disagree with the above assumptions. If
>> >> you
>> >>>>>>> agree
>> >>>>>>>> with these assumptions, with the `CacheHandle cache()` API in my
>> >> last
>> >>>>>>>> email, do you still see side effects?
>> >>>>>>>>
>> >>>>>>>> Thanks,
>> >>>>>>>>
>> >>>>>>>> Jiangjie (Becket) Qin
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Wed, Dec 12, 2018 at 7:11 PM Piotr Nowojski <
>> >>>> pi...@data-artisans.com
>> >>>>>>>>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Becket,
>> >>>>>>>>>
>> >>>>>>>>>> Regarding the chance of optimization, it might not be that
>> rare.
>> >>>> Some
>> >>>>>>>>> very
>> >>>>>>>>>> simple statistics could already help in many cases. For
>> example,
>> >>>>>>> simply
>> >>>>>>>>>> maintaining max and min of each fields can already eliminate
>> some
>> >>>>>>>>>> unnecessary table scan (potentially scanning the cached table)
>> if
>> >>>> the
>> >>>>>>>>>> result is doomed to be empty. A histogram would give even
>> further
>> >>>>>>>>>> information. The optimizer could be very careful and only
>> ignores
>> >>>>>>> cache
>> >>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a
>> >> filter
>> >>>> on
>> >>>>>>>>> the
>> >>>>>>>>>> cache will absolutely return nothing.
>> >>>>>>>>>
>> >>>>>>>>> I do not see how this might be easy to achieve. It would require
>> >> tons
>> >>>>>>> of
>> >>>>>>>>> effort to make it work and in the end you would still have a
>> >> problem
>> >>>> of
>> >>>>>>>>> comparing/trading CPU cycles vs IO. For example:
>> >>>>>>>>>
>> >>>>>>>>> Table src1 = … // read from connector 1
>> >>>>>>>>> Table src2 = … // read from connector 2
>> >>>>>>>>>
>> >>>>>>>>> Table a = src1.filter(…).join(src2.filter(…), …)
>> >>>>>>>>> a.cache() // write cache to connector 3
>> >>>>>>>>>
>> >>>>>>>>> a.filter(…)
>> >>>>>>>>> env.execute()
>> >>>>>>>>> a.select(…)
>> >>>>>>>>>
>> >>>>>>>>> Decision whether it’s better to:
>> >>>>>>>>> A) read from connector1/connector2, filter/map and join them
>> twice
>> >>>>>>>>> B) read from connector1/connector2, filter/map and join them
>> once,
>> >>>> pay
>> >>>>>>> the
>> >>>>>>>>> price of writing to connector 3 and then reading from it
>> >>>>>>>>>
>> >>>>>>>>> Is very far from trivial. `a` can end up much larger than `src1`
>> >> and
>> >>>>>>>>> `src2`, writes to connector 3 might be extremely slow, reads
>> from
>> >>>>>>> connector
>> >>>>>>>>> 3 can be slower compared to reads from connector 1 & 2, … . You
>> >>>> really
>> >>>>>>> need
>> >>>>>>>>> to have extremely good statistics to correctly asses size of the
>> >>>>>>> output and
>> >>>>>>>>> it would still be failing many times (correlations etc). And
>> keep
>> >> in
>> >>>>>>> mind
>> >>>>>>>>> that at the moment we do not have ANY statistics at all. More
>> than
>> >>>>>>> that, it
>> >>>>>>>>> would require significantly more testing and setting up some
>> >>>>>>> benchmarks to
>> >>>>>>>>> make sure that we do not brake it with some regressions.
>> >>>>>>>>>
>> >>>>>>>>> That’s why I’m strongly opposing this idea - at least let’s not
>> >>>> starts
>> >>>>>>>>> with this. If we first start with completely manual/explicit
>> >> caching,
>> >>>>>>>>> without any magic, it would be a significant improvement for the
>> >>>> users
>> >>>>>>> for
>> >>>>>>>>> a fraction of the development cost. After implementing that,
>> when
>> >> we
>> >>>>>>>>> already have all of the working pieces, we can start working on
>> >> some
>> >>>>>>>>> optimisations rules. As I wrote before, if we start with
>> >>>>>>>>>
>> >>>>>>>>> `CachedTable cache()`
>> >>>>>>>>>
>> >>>>>>>>> We can later work on follow up stories to make it automatic.
>> >> Despite
>> >>>>>>> that
>> >>>>>>>>> I don’t like this implicit/side effect approach with `void`
>> method,
>> >>>>>>> having
>> >>>>>>>>> explicit `CachedTable cache()` wouldn’t even prevent as from
>> later
>> >>>>>>> adding
>> >>>>>>>>> `void hintCache()` method, with the exact semantic that you
>> want.
>> >>>>>>>>>
>> >>>>>>>>> On top of that I re-rise again that having implicit `void
>> >>>>>>>>> cache()/hintCache()` has other side effects and problems with
>> non
>> >>>>>>> immutable
>> >>>>>>>>> data, and being annoying when used secretly inside methods.
>> >>>>>>>>>
>> >>>>>>>>> Explicit `CachedTable cache()` just looks like much less
>> >>>> controversial
>> >>>>>>> MVP
>> >>>>>>>>> and if we decide to go further with this topic, it’s not a
>> wasted
>> >>>>>>> effort,
>> >>>>>>>>> but just lies on a stright path to more advanced/complicated
>> >>>> solutions
>> >>>>>>> in
>> >>>>>>>>> the future. Are there any drawbacks of starting with
>> `CachedTable
>> >>>>>>> cache()`
>> >>>>>>>>> that I’m missing?
>> >>>>>>>>>
>> >>>>>>>>> Piotrek
>> >>>>>>>>>
>> >>>>>>>>>> On 12 Dec 2018, at 09:30, Jeff Zhang <zjf...@gmail.com> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Becket,
>> >>>>>>>>>>
>> >>>>>>>>>> Introducing CacheHandle seems too complicated. That means users
>> >> have
>> >>>>>>> to
>> >>>>>>>>>> maintain Handler properly.
>> >>>>>>>>>>
>> >>>>>>>>>> And since cache is just a hint for optimizer, why not just
>> return
>> >>>>>>> Table
>> >>>>>>>>>> itself for cache method. This hint info should be kept in
>> Table I
>> >>>>>>>>> believe.
>> >>>>>>>>>>
>> >>>>>>>>>> So how about adding method cache and uncache for Table, and
>> both
>> >>>>>>> return
>> >>>>>>>>>> Table. Because what cache and uncache did is just adding some
>> hint
>> >>>>>>> info
>> >>>>>>>>>> into Table.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道:
>> >>>>>>>>>>
>> >>>>>>>>>>> Hi Till and Piotrek,
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks for the clarification. That solves quite a few
>> confusion.
>> >> My
>> >>>>>>>>>>> understanding of how cache works is same as what Till
>> describe.
>> >>>> i.e.
>> >>>>>>>>>>> cache() is a hint to Flink, but it is not guaranteed that
>> cache
>> >>>>>>> always
>> >>>>>>>>>>> exist and it might be recomputed from its lineage.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Is this the core of our disagreement here? That you would like
>> >> this
>> >>>>>>>>>>>> “cache()” to be mostly hint for the optimiser?
>> >>>>>>>>>>>
>> >>>>>>>>>>> Semantic wise, yes. That's also why I think materialize() has
>> a
>> >>>> much
>> >>>>>>>>> larger
>> >>>>>>>>>>> scope than cache(), thus it should be a different method.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Regarding the chance of optimization, it might not be that
>> rare.
>> >>>> Some
>> >>>>>>>>> very
>> >>>>>>>>>>> simple statistics could already help in many cases. For
>> example,
>> >>>>>>> simply
>> >>>>>>>>>>> maintaining max and min of each fields can already eliminate
>> some
>> >>>>>>>>>>> unnecessary table scan (potentially scanning the cached
>> table) if
>> >>>> the
>> >>>>>>>>>>> result is doomed to be empty. A histogram would give even
>> further
>> >>>>>>>>>>> information. The optimizer could be very careful and only
>> ignores
>> >>>>>>> cache
>> >>>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a
>> >> filter
>> >>>>>>> on
>> >>>>>>>>> the
>> >>>>>>>>>>> cache will absolutely return nothing.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Given the above clarification on cache, I would like to
>> revisit
>> >> the
>> >>>>>>>>>>> original "void cache()" proposal and see if we can improve on
>> top
>> >>>> of
>> >>>>>>>>> that.
>> >>>>>>>>>>>
>> >>>>>>>>>>> What do you think about the following modified interface?
>> >>>>>>>>>>>
>> >>>>>>>>>>> Table {
>> >>>>>>>>>>> /**
>> >>>>>>>>>>> * This call hints Flink to maintain a cache of this table and
>> >>>>>>> leverage
>> >>>>>>>>>>> it for performance optimization if needed.
>> >>>>>>>>>>> * Note that Flink may still decide to not use the cache if it
>> is
>> >>>>>>>>> cheaper
>> >>>>>>>>>>> by doing so.
>> >>>>>>>>>>> *
>> >>>>>>>>>>> * A CacheHandle will be returned to allow user release the
>> cache
>> >>>>>>>>>>> actively. The cache will be deleted if there
>> >>>>>>>>>>> * is no unreleased cache handlers to it. When the
>> >> TableEnvironment
>> >>>>>>> is
>> >>>>>>>>>>> closed. The cache will also be deleted
>> >>>>>>>>>>> * and all the cache handlers will be released.
>> >>>>>>>>>>> *
>> >>>>>>>>>>> * @return a CacheHandle referring to the cache of this table.
>> >>>>>>>>>>> */
>> >>>>>>>>>>> CacheHandle cache();
>> >>>>>>>>>>> }
>> >>>>>>>>>>>
>> >>>>>>>>>>> CacheHandle {
>> >>>>>>>>>>> /**
>> >>>>>>>>>>> * Close the cache handle. This method does not necessarily
>> >> deletes
>> >>>>>>> the
>> >>>>>>>>>>> cache. Instead, it simply decrements the reference counter to
>> the
>> >>>>>>> cache.
>> >>>>>>>>>>> * When the there is no handle referring to a cache. The cache
>> >> will
>> >>>>>>> be
>> >>>>>>>>>>> deleted.
>> >>>>>>>>>>> *
>> >>>>>>>>>>> * @return the number of open handles to the cache after this
>> >> handle
>> >>>>>>>>> has
>> >>>>>>>>>>> been released.
>> >>>>>>>>>>> */
>> >>>>>>>>>>> int release()
>> >>>>>>>>>>> }
>> >>>>>>>>>>>
>> >>>>>>>>>>> The rationale behind this interface is following:
>> >>>>>>>>>>> In vast majority of the cases, users wouldn't really care
>> whether
>> >>>> the
>> >>>>>>>>> cache
>> >>>>>>>>>>> is used or not. So I think the most intuitive way is letting
>> >>>> cache()
>> >>>>>>>>> return
>> >>>>>>>>>>> nothing. So nobody needs to worry about the difference between
>> >>>>>>>>> operations
>> >>>>>>>>>>> on CacheTables and those on the "original" tables. This will
>> make
>> >>>>>>> maybe
>> >>>>>>>>>>> 99.9% of the users happy. There were two concerns raised for
>> this
>> >>>>>>>>> approach:
>> >>>>>>>>>>> 1. In some rare cases, users may want to ignore cache,
>> >>>>>>>>>>> 2. A table might be cached/uncached in a third party function
>> >> while
>> >>>>>>> the
>> >>>>>>>>>>> caller does not know.
>> >>>>>>>>>>>
>> >>>>>>>>>>> For the first issue, users can use hint("ignoreCache") to
>> >>>> explicitly
>> >>>>>>>>> ignore
>> >>>>>>>>>>> cache.
>> >>>>>>>>>>> For the second issue, the above proposal lets cache() return a
>> >>>>>>>>> CacheHandle,
>> >>>>>>>>>>> the only method in it is release(). Different CacheHandles
>> will
>> >>>>>>> refer to
>> >>>>>>>>>>> the same cache, if a cache no longer has any cache handle, it
>> >> will
>> >>>> be
>> >>>>>>>>>>> deleted. This will address the following case:
>> >>>>>>>>>>> {
>> >>>>>>>>>>> val handle1 = a.cache()
>> >>>>>>>>>>> process(a)
>> >>>>>>>>>>> a.select(...) // cache is still available, handle1 has not
>> been
>> >>>>>>>>> released.
>> >>>>>>>>>>> }
>> >>>>>>>>>>>
>> >>>>>>>>>>> void process(Table t) {
>> >>>>>>>>>>> val handle2 = t.cache() // new handle to cache
>> >>>>>>>>>>> t.select(...) // optimizer decides cache usage
>> >>>>>>>>>>> t.hint("ignoreCache").select(...) // cache is ignored
>> >>>>>>>>>>> handle2.release() // release the handle, but the cache may
>> still
>> >> be
>> >>>>>>>>>>> available if there are other handles
>> >>>>>>>>>>> ...
>> >>>>>>>>>>> }
>> >>>>>>>>>>>
>> >>>>>>>>>>> Does the above modified approach look reasonable to you?
>> >>>>>>>>>>>
>> >>>>>>>>>>> Cheers,
>> >>>>>>>>>>>
>> >>>>>>>>>>> Jiangjie (Becket) Qin
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann <
>> >>>> trohrm...@apache.org>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Hi Becket,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I was aiming at semantics similar to 1. I actually thought
>> that
>> >>>>>>>>> `cache()`
>> >>>>>>>>>>>> would tell the system to materialize the intermediate result
>> so
>> >>>> that
>> >>>>>>>>>>>> subsequent queries don't need to reprocess it. This means
>> that
>> >> the
>> >>>>>>>>> usage
>> >>>>>>>>>>> of
>> >>>>>>>>>>>> the cached table in this example
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> {
>> >>>>>>>>>>>> val cachedTable = a.cache()
>> >>>>>>>>>>>> val b1 = cachedTable.select(…)
>> >>>>>>>>>>>> val b2 = cachedTable.foo().select(…)
>> >>>>>>>>>>>> val b3 = cachedTable.bar().select(...)
>> >>>>>>>>>>>> val c1 = a.select(…)
>> >>>>>>>>>>>> val c2 = a.foo().select(…)
>> >>>>>>>>>>>> val c3 = a.bar().select(...)
>> >>>>>>>>>>>> }
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> strongly depends on interleaved calls which trigger the
>> >> execution
>> >>>> of
>> >>>>>>>>> sub
>> >>>>>>>>>>>> queries. So for example, if there is only a single
>> env.execute
>> >>>> call
>> >>>>>>> at
>> >>>>>>>>>>> the
>> >>>>>>>>>>>> end of  block, then b1, b2, b3, c1, c2 and c3 would all be
>> >>>> computed
>> >>>>>>> by
>> >>>>>>>>>>>> reading directly from the sources (given that there is only a
>> >>>> single
>> >>>>>>>>>>>> JobGraph). It just happens that the result of `a` will be
>> cached
>> >>>>>>> such
>> >>>>>>>>>>> that
>> >>>>>>>>>>>> we skip the processing of `a` when there are subsequent
>> queries
>> >>>>>>> reading
>> >>>>>>>>>>>> from `cachedTable`. If for some reason the system cannot
>> >>>> materialize
>> >>>>>>>>> the
>> >>>>>>>>>>>> table (e.g. running out of disk space, ttl expired), then it
>> >> could
>> >>>>>>> also
>> >>>>>>>>>>>> happen that we need to reprocess `a`. In that sense
>> >> `cachedTable`
>> >>>>>>>>> simply
>> >>>>>>>>>>> is
>> >>>>>>>>>>>> an identifier for the materialized result of `a` with the
>> >> lineage
>> >>>>>>> how
>> >>>>>>>>> to
>> >>>>>>>>>>>> reprocess it.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Cheers,
>> >>>>>>>>>>>> Till
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski <
>> >>>>>>>>> pi...@data-artisans.com
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> Hi Becket,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> {
>> >>>>>>>>>>>>>> val cachedTable = a.cache()
>> >>>>>>>>>>>>>> val b = cachedTable.select(...)
>> >>>>>>>>>>>>>> val c = a.select(...)
>> >>>>>>>>>>>>>> }
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses
>> >>>>>>> original
>> >>>>>>>>>>> DAG
>> >>>>>>>>>>>>> as
>> >>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no
>> chance to
>> >>>>>>>>>>>> optimize.
>> >>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c
>> leaves
>> >> the
>> >>>>>>>>>>>>> optimizer
>> >>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this
>> >> case,
>> >>>>>>> user
>> >>>>>>>>>>>>> lose
>> >>>>>>>>>>>>>> the option to NOT use cache.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> As you can see, neither of the options seem perfect.
>> However,
>> >> I
>> >>>>>>> guess
>> >>>>>>>>>>>> you
>> >>>>>>>>>>>>>> and Till are proposing the third option:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache
>> or
>> >>>> DAG
>> >>>>>>>>>>>> should
>> >>>>>>>>>>>>> be
>> >>>>>>>>>>>>>> used. c always use the DAG.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> I am pretty sure that me, Till, Fabian and others were all
>> >>>>>>> proposing
>> >>>>>>>>>>> and
>> >>>>>>>>>>>>> advocating in favour of semantic “1”. No cost based
>> optimiser
>> >>>>>>>>> decisions
>> >>>>>>>>>>>> at
>> >>>>>>>>>>>>> all.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> {
>> >>>>>>>>>>>>> val cachedTable = a.cache()
>> >>>>>>>>>>>>> val b1 = cachedTable.select(…)
>> >>>>>>>>>>>>> val b2 = cachedTable.foo().select(…)
>> >>>>>>>>>>>>> val b3 = cachedTable.bar().select(...)
>> >>>>>>>>>>>>> val c1 = a.select(…)
>> >>>>>>>>>>>>> val c2 = a.foo().select(…)
>> >>>>>>>>>>>>> val c3 = a.bar().select(...)
>> >>>>>>>>>>>>> }
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> All b1, b2 and b3 are reading from cache, while c1, c2 and
>> c3
>> >> are
>> >>>>>>>>>>>>> re-executing whole plan for “a”.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> In the future we could discuss going one step further,
>> >>>> introducing
>> >>>>>>>>> some
>> >>>>>>>>>>>>> global optimisation (that can be manually enabled/disabled):
>> >>>>>>>>>>> deduplicate
>> >>>>>>>>>>>>> plan nodes/deduplicate sub queries/re-use sub queries
>> >> results/or
>> >>>>>>>>>>> whatever
>> >>>>>>>>>>>>> we could call it. It could do two things:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> 1. Automatically try to deduplicate fragments of the plan
>> and
>> >>>> share
>> >>>>>>>>> the
>> >>>>>>>>>>>>> result using CachedTable - in other words automatically
>> insert
>> >>>>>>>>>>>> `CachedTable
>> >>>>>>>>>>>>> cache()` calls.
>> >>>>>>>>>>>>> 2. Automatically make decision to bypass explicit
>> `CachedTable`
>> >>>>>>> access
>> >>>>>>>>>>>>> (this would be the equivalent of what you described as
>> >> “semantic
>> >>>>>>> 3”).
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> However as I wrote previously, I have big doubts if such
>> >>>> cost-based
>> >>>>>>>>>>>>> optimisation would work (this applies also to “Semantic
>> 2”). I
>> >>>>>>> would
>> >>>>>>>>>>>> expect
>> >>>>>>>>>>>>> it to do more harm than good in so many cases, that it
>> wouldn’t
>> >>>>>>> make
>> >>>>>>>>>>>> sense.
>> >>>>>>>>>>>>> Even assuming that we calculate statistics perfectly (this
>> >> ain’t
>> >>>>>>> gonna
>> >>>>>>>>>>>>> happen), it’s virtually impossible to correctly estimate
>> >> correct
>> >>>>>>>>>>> exchange
>> >>>>>>>>>>>>> rate of CPU cycles vs IO operations as it is changing so
>> much
>> >>>> from
>> >>>>>>>>>>>>> deployment to deployment.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Is this the core of our disagreement here? That you would
>> like
>> >>>> this
>> >>>>>>>>>>>>> “cache()” to be mostly hint for the optimiser?
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Piotrek
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com
>> >
>> >>>>>>> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Another potential concern for semantic 3 is that. In the
>> >> future,
>> >>>>>>> we
>> >>>>>>>>>>> may
>> >>>>>>>>>>>>> add
>> >>>>>>>>>>>>>> automatic caching to Flink. e.g. cache the intermediate
>> >> results
>> >>>> at
>> >>>>>>>>>>> the
>> >>>>>>>>>>>>>> shuffle boundary. If our semantic is that reference to the
>> >>>>>>> original
>> >>>>>>>>>>>> table
>> >>>>>>>>>>>>>> means skipping cache, those users may not be able to
>> benefit
>> >>>> from
>> >>>>>>> the
>> >>>>>>>>>>>>>> implicit cache.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On Tue, Dec 11, 2018 at 12:10 PM Becket Qin <
>> >>>> becket....@gmail.com
>> >>>>>>>>
>> >>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Hi Piotrek,
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Thanks for the reply. Thought about it again, I might have
>> >>>>>>>>>>>> misunderstood
>> >>>>>>>>>>>>>>> your proposal in earlier emails. Returning a CachedTable
>> >> might
>> >>>>>>> not
>> >>>>>>>>>>> be
>> >>>>>>>>>>>> a
>> >>>>>>>>>>>>> bad
>> >>>>>>>>>>>>>>> idea.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> I was more concerned about the semantic and its
>> intuitiveness
>> >>>>>>> when a
>> >>>>>>>>>>>>>>> CachedTable is returned. i..e, if cache() returns
>> >> CachedTable.
>> >>>>>>> What
>> >>>>>>>>>>>> are
>> >>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>> semantic in the following code:
>> >>>>>>>>>>>>>>> {
>> >>>>>>>>>>>>>>> val cachedTable = a.cache()
>> >>>>>>>>>>>>>>> val b = cachedTable.select(...)
>> >>>>>>>>>>>>>>> val c = a.select(...)
>> >>>>>>>>>>>>>>> }
>> >>>>>>>>>>>>>>> What is the difference between b and c? At the first
>> glance,
>> >> I
>> >>>>>>> see
>> >>>>>>>>>>> two
>> >>>>>>>>>>>>>>> options:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses
>> >>>>>>> original
>> >>>>>>>>>>>> DAG
>> >>>>>>>>>>>>> as
>> >>>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no
>> chance
>> >> to
>> >>>>>>>>>>>> optimize.
>> >>>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c
>> leaves
>> >>>> the
>> >>>>>>>>>>>>> optimizer
>> >>>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this
>> >>>> case,
>> >>>>>>>>>>> user
>> >>>>>>>>>>>>> lose
>> >>>>>>>>>>>>>>> the option to NOT use cache.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> As you can see, neither of the options seem perfect.
>> >> However, I
>> >>>>>>>>>>> guess
>> >>>>>>>>>>>>> you
>> >>>>>>>>>>>>>>> and Till are proposing the third option:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether
>> cache or
>> >>>> DAG
>> >>>>>>>>>>>> should
>> >>>>>>>>>>>>>>> be used. c always use the DAG.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> This does address all the concerns. It is just that from
>> >>>>>>>>>>> intuitiveness
>> >>>>>>>>>>>>>>> perspective, I found that asking user to explicitly use a
>> >>>>>>>>>>> CachedTable
>> >>>>>>>>>>>>> while
>> >>>>>>>>>>>>>>> the optimizer might choose to ignore is a little weird.
>> That
>> >>>> was
>> >>>>>>>>>>> why I
>> >>>>>>>>>>>>> did
>> >>>>>>>>>>>>>>> not think about that semantic. But given there is material
>> >>>>>>> benefit,
>> >>>>>>>>>>> I
>> >>>>>>>>>>>>> think
>> >>>>>>>>>>>>>>> this semantic is acceptable.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to
>> use
>> >>>>>>> cache
>> >>>>>>>>>>> or
>> >>>>>>>>>>>>> not,
>> >>>>>>>>>>>>>>>> then why do we need “void cache()” method at all? Would
>> It
>> >>>>>>>>>>>> “increase”
>> >>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>> chance of using the cache? That’s sounds strange. What
>> would
>> >>>> be
>> >>>>>>> the
>> >>>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not?
>> If we
>> >>>>>>> want
>> >>>>>>>>>>> to
>> >>>>>>>>>>>>>>>> introduce such kind  automated optimisations of “plan
>> nodes
>> >>>>>>>>>>>>> deduplication”
>> >>>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the
>> >>>>>>> optimiser
>> >>>>>>>>>>> do
>> >>>>>>>>>>>>> all of
>> >>>>>>>>>>>>>>>> the work.
>> >>>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any
>> use/not
>> >> use
>> >>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>> decision.
>> >>>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether
>> >> such
>> >>>>>>> cost
>> >>>>>>>>>>>>> based
>> >>>>>>>>>>>>>>>> optimisations would work properly and I would still
>> insist
>> >>>>>>> first on
>> >>>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable
>> cache()`)
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> We are absolutely on the same page here. An explicit
>> cache()
>> >>>>>>> method
>> >>>>>>>>>>> is
>> >>>>>>>>>>>>>>> necessary not only because optimizer may not be able to
>> make
>> >>>> the
>> >>>>>>>>>>> right
>> >>>>>>>>>>>>>>> decision, but also because of the nature of interactive
>> >>>>>>> programming.
>> >>>>>>>>>>>> For
>> >>>>>>>>>>>>>>> example, if users write the following code in Scala shell:
>> >>>>>>>>>>>>>>> val b = a.select(...)
>> >>>>>>>>>>>>>>> val c = b.select(...)
>> >>>>>>>>>>>>>>> val d = c.select(...).writeToSink(...)
>> >>>>>>>>>>>>>>> tEnv.execute()
>> >>>>>>>>>>>>>>> There is no way optimizer will know whether b or c will be
>> >> used
>> >>>>>>> in
>> >>>>>>>>>>>> later
>> >>>>>>>>>>>>>>> code, unless users hint explicitly.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our
>> >>>>>>>>>>> objections
>> >>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which
>> me,
>> >>>>>>> Jark,
>> >>>>>>>>>>>>> Fabian,
>> >>>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Is there any other side effects if we use semantic 3
>> >> mentioned
>> >>>>>>>>>>> above?
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Thanks,
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> JIangjie (Becket) Qin
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <
>> >>>>>>>>>>>> pi...@data-artisans.com
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Hi Becket,
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Sorry for not responding long time.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Regarding case1.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> There wouldn’t be no “a.unCache()” method, but I would
>> >> expect
>> >>>>>>> only
>> >>>>>>>>>>>>>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1`
>> >> wouldn’t
>> >>>>>>>>>>> affect
>> >>>>>>>>>>>>>>>> `cachedTableA2`. Just as in any other database dropping
>> >>>>>>> modifying
>> >>>>>>>>>>> one
>> >>>>>>>>>>>>>>>> independent table/materialised view does not affect
>> others.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> What I meant is that assuming there is already a cached
>> >>>> table,
>> >>>>>>>>>>>> ideally
>> >>>>>>>>>>>>>>>> users need
>> >>>>>>>>>>>>>>>>> not to specify whether the next query should read from
>> the
>> >>>>>>> cache
>> >>>>>>>>>>> or
>> >>>>>>>>>>>>> use
>> >>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to
>> use
>> >>>>>>> cache
>> >>>>>>>>>>> or
>> >>>>>>>>>>>>>>>> not, then why do we need “void cache()” method at all?
>> Would
>> >>>> It
>> >>>>>>>>>>>>> “increase”
>> >>>>>>>>>>>>>>>> the chance of using the cache? That’s sounds strange.
>> What
>> >>>>>>> would be
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not?
>> If we
>> >>>>>>> want
>> >>>>>>>>>>> to
>> >>>>>>>>>>>>>>>> introduce such kind  automated optimisations of “plan
>> nodes
>> >>>>>>>>>>>>> deduplication”
>> >>>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the
>> >>>>>>> optimiser
>> >>>>>>>>>>> do
>> >>>>>>>>>>>>> all of
>> >>>>>>>>>>>>>>>> the work.
>> >>>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any
>> use/not
>> >> use
>> >>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>> decision.
>> >>>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether
>> >> such
>> >>>>>>> cost
>> >>>>>>>>>>>>> based
>> >>>>>>>>>>>>>>>> optimisations would work properly and I would still
>> insist
>> >>>>>>> first on
>> >>>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable
>> cache()`)
>> >>>>>>>>>>>>>>>> 4. As Till wrote, having explicit `CachedTable cache()`
>> >>>> doesn’t
>> >>>>>>>>>>>>>>>> contradict future work on automated cost based caching.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to
>> our
>> >>>>>>>>>>> objections
>> >>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which
>> me,
>> >>>>>>> Jark,
>> >>>>>>>>>>>>> Fabian,
>> >>>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Piotrek
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> On 5 Dec 2018, at 12:42, Becket Qin <
>> becket....@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Hi Till,
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> It is true that after the first job submission, there
>> will
>> >> be
>> >>>>>>> no
>> >>>>>>>>>>>>>>>> ambiguity
>> >>>>>>>>>>>>>>>>> in terms of whether a cached table is used or not. That
>> is
>> >>>> the
>> >>>>>>>>>>> same
>> >>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> cache() without returning a CachedTable.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a
>> >>>>>>> caching
>> >>>>>>>>>>>>>>>> operator
>> >>>>>>>>>>>>>>>>>> from which you need to consume from if you want to
>> benefit
>> >>>>>>> from
>> >>>>>>>>>>> the
>> >>>>>>>>>>>>>>>> caching
>> >>>>>>>>>>>>>>>>>> functionality.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I am thinking a little differently. I think it is a hint
>> >> (as
>> >>>>>>> you
>> >>>>>>>>>>>>>>>> mentioned
>> >>>>>>>>>>>>>>>>> later) instead of a new operator. I'd like to be careful
>> >>>> about
>> >>>>>>> the
>> >>>>>>>>>>>>>>>> semantic
>> >>>>>>>>>>>>>>>>> of the API. A hint is a property set on an existing
>> >> operator,
>> >>>>>>> but
>> >>>>>>>>>>> is
>> >>>>>>>>>>>>> not
>> >>>>>>>>>>>>>>>>> itself an operator as it does not really manipulate the
>> >> data.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of
>> decision
>> >>>>>>> which
>> >>>>>>>>>>>>>>>>>> intermediate result should be cached. But especially
>> when
>> >>>>>>>>>>> executing
>> >>>>>>>>>>>>>>>> ad-hoc
>> >>>>>>>>>>>>>>>>>> queries the user might better know which results need
>> to
>> >> be
>> >>>>>>>>>>> cached
>> >>>>>>>>>>>>>>>> because
>> >>>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I
>> would
>> >>>>>>> consider
>> >>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course,
>> in
>> >>>> the
>> >>>>>>>>>>>> future
>> >>>>>>>>>>>>> we
>> >>>>>>>>>>>>>>>>>> might add functionality which tries to automatically
>> cache
>> >>>>>>>>>>> results
>> >>>>>>>>>>>>>>>> (e.g.
>> >>>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so
>> >> much
>> >>>>>>>>>>> space
>> >>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with
>> >>>>>>> `CachedTable
>> >>>>>>>>>>>>>>>> cache()`.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I agree that cache() method is needed for exactly the
>> >> reason
>> >>>>>>> you
>> >>>>>>>>>>>>>>>> mentioned,
>> >>>>>>>>>>>>>>>>> i.e. Flink cannot predict what users are going to write
>> >>>> later,
>> >>>>>>> so
>> >>>>>>>>>>>>> users
>> >>>>>>>>>>>>>>>>> need to tell Flink explicitly that this table will be
>> used
>> >>>>>>> later.
>> >>>>>>>>>>>>> What I
>> >>>>>>>>>>>>>>>>> meant is that assuming there is already a cached table,
>> >>>> ideally
>> >>>>>>>>>>>> users
>> >>>>>>>>>>>>>>>> need
>> >>>>>>>>>>>>>>>>> not to specify whether the next query should read from
>> the
>> >>>>>>> cache
>> >>>>>>>>>>> or
>> >>>>>>>>>>>>> use
>> >>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> To explain the difference between returning / not
>> >> returning a
>> >>>>>>>>>>>>>>>> CachedTable,
>> >>>>>>>>>>>>>>>>> I want compare the following two case:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> *Case 1:  returning a CachedTable*
>> >>>>>>>>>>>>>>>>> b = a.map(...)
>> >>>>>>>>>>>>>>>>> val cachedTableA1 = a.cache()
>> >>>>>>>>>>>>>>>>> val cachedTableA2 = a.cache()
>> >>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> c = a.filter(...) // User specify that the original DAG
>> is
>> >>>>>>> used?
>> >>>>>>>>>>> Or
>> >>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> optimizer decides whether DAG or cache should be used?
>> >>>>>>>>>>>>>>>>> d = cachedTableA1.filter() // User specify that the
>> cached
>> >>>>>>> table
>> >>>>>>>>>>> is
>> >>>>>>>>>>>>>>>> used.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> a.unCache() // Can cachedTableA still be used
>> afterwards?
>> >>>>>>>>>>>>>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be
>> used?
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> *Case 2: not returning a CachedTable*
>> >>>>>>>>>>>>>>>>> b = a.map()
>> >>>>>>>>>>>>>>>>> a.cache()
>> >>>>>>>>>>>>>>>>> a.cache() // no-op
>> >>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the
>> cache or
>> >>>> DAG
>> >>>>>>>>>>>> should
>> >>>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>>> used
>> >>>>>>>>>>>>>>>>> d = a.filter(...) // Optimizer decides whether the
>> cache or
>> >>>> DAG
>> >>>>>>>>>>>> should
>> >>>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>>> used
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> a.unCache()
>> >>>>>>>>>>>>>>>>> a.unCache() // no-op
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> In case 1, semantic wise, optimizer lose the option to
>> >> choose
>> >>>>>>>>>>>> between
>> >>>>>>>>>>>>>>>> DAG
>> >>>>>>>>>>>>>>>>> and cache. And the unCache() call becomes tricky.
>> >>>>>>>>>>>>>>>>> In case 2, users do not need to worry about whether
>> cache
>> >> or
>> >>>>>>> DAG
>> >>>>>>>>>>> is
>> >>>>>>>>>>>>>>>> used.
>> >>>>>>>>>>>>>>>>> And the unCache() semantic is clear. However, the
>> caveat is
>> >>>>>>> that
>> >>>>>>>>>>>> users
>> >>>>>>>>>>>>>>>>> cannot explicitly ignore the cache.
>> >>>>>>>>>>>>>>>
>
>

Reply via email to