I spoke to Piotr a little bit offline and I wanted to comment with a summary of 
our discussion and what I believe is most intuitive cache model from a users 
perspective. 

(I am making up some class names here, not looking to bike shed feel free to 
change the names how ever you see fit). 

A cache is by definition an optimization, something used to store intermediate 
results for faster / more performant downstream computation. Therefore, as a 
Flink user I would not expect it to change the semantics of my application, I 
would expect it to be rebuildable, and I do not expect to know how it works 
under the hood. With there principles in mind I feel the most intuitive api 
would be as follows: 

// Some table 
Table a = . . . 

// Signal that we would like to cache the table
// this is lazy and does not force any computation. 
CachedTable cachedA = a.cache()

// The first operation against the cache.
// This count will trigger reading input
// data and building the cache. 
cachedA.count()

// Operates against the cache, no operations
// before a.cache are performed. 
cachedA.sum()

// This does not operate against the cache, 
// it will trigger reading data from source
// and performing a full computation
a.min()

// Invalidates the cache, releasing all 
// underlying resources
cachedA.invalidateCache()

// Rebuilds the cache. Since caches are recomputable 
// this should not be an error, it will simply be a more
// expensive operation than if we had not invalidated the cache. 
cachedA.min()

This model leads to 2 nice properties: 

1) The same cache can be shared across multiple invocations of Table#cache. 
Because the cache can always be rebuilt one code path invalidating the cache 
will not break others. Cache’s are simply and optimization and rebuilding the 
cache is not an error but an expected property, semantics never change.

2) When automatic caching is implemented it can follow this same model. 
   a) A single cache is created when the optimizer determines it is necessary. 
   b) If the user decides to explicitly cache a table which has already been 
implicitly cached under the hood then calling Table#cache will just return that 
pre-built cache. 
   c) If either the user or optimizer decide to invalidate the cache then 
neither code path will break the other, the cache is simply destroyed and will 
be rebuilt the next time is needed. 

Of course caches are still automatically cleaned up when user sessions are 
terminated. 

Seth 

On 2018/12/11 04:10:21, Becket Qin <b...@gmail.com> wrote: 
> Hi Piotrek,> 
> 
> Thanks for the reply. Thought about it again, I might have misunderstood> 
> your proposal in earlier emails. Returning a CachedTable might not be a bad> 
> idea.> 
> 
> I was more concerned about the semantic and its intuitiveness when a> 
> CachedTable is returned. i..e, if cache() returns CachedTable. What are the> 
> semantic in the following code:> 
> {> 
>   val cachedTable = a.cache()> 
>   val b = cachedTable.select(...)> 
>   val c = a.select(...)> 
> }> 
> What is the difference between b and c? At the first glance, I see two> 
> options:> 
> 
> Semantic 1. b uses cachedTable as user demanded so. c uses original DAG as> 
> user demanded so. In this case, the optimizer has no chance to optimize.> 
> Semantic 2. b uses cachedTable as user demanded so. c leaves the optimizer> 
> to choose whether the cache or DAG should be used. In this case, user lose> 
> the option to NOT use cache.> 
> 
> As you can see, neither of the options seem perfect. However, I guess you> 
> and Till are proposing the third option:> 
> 
> Semantic 3. b leaves the optimizer to choose whether cache or DAG should be> 
> used. c always use the DAG.> 
> 
> This does address all the concerns. It is just that from intuitiveness> 
> perspective, I found that asking user to explicitly use a CachedTable while> 
> the optimizer might choose to ignore is a little weird. That was why I did> 
> not think about that semantic. But given there is material benefit, I think> 
> this semantic is acceptable.> 
> 
> 1. If we want to let optimiser make decisions whether to use cache or not,> 
> > then why do we need “void cache()” method at all? Would It  “increase” the> 
> > chance of using the cache? That’s sounds strange. What would be the> 
> > mechanism of deciding whether to use the cache or not? If we want to> 
> > introduce such kind  automated optimisations of “plan nodes deduplication”> 
> > I would turn it on globally, not per table, and let the optimiser do all 
> > of> 
> > the work.> 
> > 2. We do not have statistics at the moment for any use/not use cache> 
> > decision.> 
> > 3. Even if we had, I would be veeerryy sceptical whether such cost based> 
> > optimisations would work properly and I would still insist first on> 
> > providing explicit caching mechanism (`CachedTable cache()`)> 
> >> 
> We are absolutely on the same page here. An explicit cache() method is> 
> necessary not only because optimizer may not be able to make the right> 
> decision, but also because of the nature of interactive programming. For> 
> example, if users write the following code in Scala shell:> 
>   val b = a.select(...)> 
>   val c = b.select(...)> 
>   val d = c.select(...).writeToSink(...)> 
>   tEnv.execute()> 
> There is no way optimizer will know whether b or c will be used in later> 
> code, unless users hint explicitly.> 
> 
> At the same time I’m not sure if you have responded to our objections of> 
> > `void cache()` being implicit/having side effects, which me, Jark, Fabian,> 
> > Till and I think also Shaoxuan are supporting.> 
> 
> Is there any other side effects if we use semantic 3 mentioned above?> 
> 
> Thanks,> 
> 
> JIangjie (Becket) Qin> 
> 
> 
> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski <pi...@data-artisans.com>> 
> wrote:> 
> 
> > Hi Becket,> 
> >> 
> > Sorry for not responding long time.> 
> >> 
> > Regarding case1.> 
> >> 
> > There wouldn’t be no “a.unCache()” method, but I would expect only> 
> > `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect> 
> > `cachedTableA2`. Just as in any other database dropping modifying one> 
> > independent table/materialised view does not affect others.> 
> >> 
> > > What I meant is that assuming there is already a cached table, ideally> 
> > users need> 
> > > not to specify whether the next query should read from the cache or use> 
> > the> 
> > > original DAG. This should be decided by the optimizer.> 
> >> 
> > 1. If we want to let optimiser make decisions whether to use cache or not,> 
> > then why do we need “void cache()” method at all? Would It  “increase” the> 
> > chance of using the cache? That’s sounds strange. What would be the> 
> > mechanism of deciding whether to use the cache or not? If we want to> 
> > introduce such kind  automated optimisations of “plan nodes deduplication”> 
> > I would turn it on globally, not per table, and let the optimiser do all 
> > of> 
> > the work.> 
> > 2. We do not have statistics at the moment for any use/not use cache> 
> > decision.> 
> > 3. Even if we had, I would be veeerryy sceptical whether such cost based> 
> > optimisations would work properly and I would still insist first on> 
> > providing explicit caching mechanism (`CachedTable cache()`)> 
> > 4. As Till wrote, having explicit `CachedTable cache()` doesn’t contradict> 
> > future work on automated cost based caching.> 
> >> 
> >> 
> > At the same time I’m not sure if you have responded to our objections of> 
> > `void cache()` being implicit/having side effects, which me, Jark, Fabian,> 
> > Till and I think also Shaoxuan are supporting.> 
> >> 
> > Piotrek> 
> >> 
> > > On 5 Dec 2018, at 12:42, Becket Qin <be...@gmail.com> wrote:> 
> > >> 
> > > Hi Till,> 
> > >> 
> > > It is true that after the first job submission, there will be no> 
> > ambiguity> 
> > > in terms of whether a cached table is used or not. That is the same for> 
> > the> 
> > > cache() without returning a CachedTable.> 
> > >> 
> > > Conceptually one could think of cache() as introducing a caching 
> > > operator> 
> > >> from which you need to consume from if you want to benefit from the> 
> > caching> 
> > >> functionality.> 
> > >> 
> > > I am thinking a little differently. I think it is a hint (as you> 
> > mentioned> 
> > > later) instead of a new operator. I'd like to be careful about the> 
> > semantic> 
> > > of the API. A hint is a property set on an existing operator, but is not> 
> > > itself an operator as it does not really manipulate the data.> 
> > >> 
> > > I agree, ideally the optimizer makes this kind of decision which> 
> > >> intermediate result should be cached. But especially when executing> 
> > ad-hoc> 
> > >> queries the user might better know which results need to be cached> 
> > because> 
> > >> Flink might not see the full DAG. In that sense, I would consider the> 
> > >> cache() method as a hint for the optimizer. Of course, in the future we> 
> > >> might add functionality which tries to automatically cache results 
> > >> (e.g.> 
> > >> caching the latest intermediate results until so and so much space is> 
> > >> used). But this should hopefully not contradict with `CachedTable> 
> > cache()`.> 
> > >> 
> > > I agree that cache() method is needed for exactly the reason you> 
> > mentioned,> 
> > > i.e. Flink cannot predict what users are going to write later, so users> 
> > > need to tell Flink explicitly that this table will be used later. What I> 
> > > meant is that assuming there is already a cached table, ideally users> 
> > need> 
> > > not to specify whether the next query should read from the cache or use> 
> > the> 
> > > original DAG. This should be decided by the optimizer.> 
> > >> 
> > > To explain the difference between returning / not returning a> 
> > CachedTable,> 
> > > I want compare the following two case:> 
> > >> 
> > > *Case 1:  returning a CachedTable*> 
> > > b = a.map(...)> 
> > > val cachedTableA1 = a.cache()> 
> > > val cachedTableA2 = a.cache()> 
> > > b.print() // Just to make sure a is cached.> 
> > >> 
> > > c = a.filter(...) // User specify that the original DAG is used? Or the> 
> > > optimizer decides whether DAG or cache should be used?> 
> > > d = cachedTableA1.filter() // User specify that the cached table is 
> > > used.> 
> > >> 
> > > a.unCache() // Can cachedTableA still be used afterwards?> 
> > > cachedTableA1.uncache() // Can cachedTableA2 still be used?> 
> > >> 
> > > *Case 2: not returning a CachedTable*> 
> > > b = a.map()> 
> > > a.cache()> 
> > > a.cache() // no-op> 
> > > b.print() // Just to make sure a is cached> 
> > >> 
> > > c = a.filter(...) // Optimizer decides whether the cache or DAG should 
> > > be> 
> > > used> 
> > > d = a.filter(...) // Optimizer decides whether the cache or DAG should 
> > > be> 
> > > used> 
> > >> 
> > > a.unCache()> 
> > > a.unCache() // no-op> 
> > >> 
> > > In case 1, semantic wise, optimizer lose the option to choose between 
> > > DAG> 
> > > and cache. And the unCache() call becomes tricky.> 
> > > In case 2, users do not need to worry about whether cache or DAG is 
> > > used.> 
> > > And the unCache() semantic is clear. However, the caveat is that users> 
> > > cannot explicitly ignore the cache.> 
> > >> 
> > > In order to address the issues mentioned in case 2 and inspired by the> 
> > > discussion so far, I am thinking about using hint to allow user> 
> > explicitly> 
> > > ignore cache. Although we do not have hint yet, but we probably should> 
> > have> 
> > > one. So the code becomes:> 
> > >> 
> > > *Case 3: returning this table*> 
> > > b = a.map()> 
> > > a.cache()> 
> > > a.cache() // no-op> 
> > > b.print() // Just to make sure a is cached> 
> > >> 
> > > c = a.filter(...) // Optimizer decides whether the cache or DAG should 
> > > be> 
> > > used> 
> > > d = a.hint("ignoreCache").filter(...) // DAG will be used instead of the> 
> > > cache.> 
> > >> 
> > > a.unCache()> 
> > > a.unCache() // no-op> 
> > >> 
> > > We could also let cache() return this table to allow chained method> 
> > calls.> 
> > > Do you think this API addresses the concerns?> 
> > >> 
> > > Thanks,> 
> > >> 
> > > Jiangjie (Becket) Qin> 
> > >> 
> > >> 
> > > On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <im...@gmail.com> wrote:> 
> > >> 
> > >> Hi,> 
> > >>> 
> > >> All the recent discussions are focused on whether there is a problem if> 
> > >> cache() not return a Table.> 
> > >> It seems that returning a Table explicitly is more clear (and safe?).> 
> > >>> 
> > >> So whether there are any problems if cache() returns a Table?  @Becket> 
> > >>> 
> > >> Best,> 
> > >> Jark> 
> > >>> 
> > >> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann <tr...@apache.org>> 
> > wrote:> 
> > >>> 
> > >>> It's true that b, c, d and e will all read from the original DAG that> 
> > >>> generates a. But all subsequent operators (when running multiple> 
> > queries)> 
> > >>> which reference cachedTableA should not need to reproduce `a` but> 
> > >> directly> 
> > >>> consume the intermediate result.> 
> > >>>> 
> > >>> Conceptually one could think of cache() as introducing a caching> 
> > operator> 
> > >>> from which you need to consume from if you want to benefit from the> 
> > >> caching> 
> > >>> functionality.> 
> > >>>> 
> > >>> I agree, ideally the optimizer makes this kind of decision which> 
> > >>> intermediate result should be cached. But especially when executing> 
> > >> ad-hoc> 
> > >>> queries the user might better know which results need to be cached> 
> > >> because> 
> > >>> Flink might not see the full DAG. In that sense, I would consider the> 
> > >>> cache() method as a hint for the optimizer. Of course, in the future 
> > >>> we> 
> > >>> might add functionality which tries to automatically cache results> 
> > (e.g.> 
> > >>> caching the latest intermediate results until so and so much space is> 
> > >>> used). But this should hopefully not contradict with `CachedTable> 
> > >> cache()`.> 
> > >>>> 
> > >>> Cheers,> 
> > >>> Till> 
> > >>>> 
> > >>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin <be...@gmail.com>> 
> > wrote:> 
> > >>>> 
> > >>>> Hi Till,> 
> > >>>>> 
> > >>>> Thanks for the clarification. I am still a little confused.> 
> > >>>>> 
> > >>>> If cache() returns a CachedTable, the example might become:> 
> > >>>>> 
> > >>>> b = a.map(...)> 
> > >>>> c = a.map(...)> 
> > >>>>> 
> > >>>> cachedTableA = a.cache()> 
> > >>>> d = cachedTableA.map(...)> 
> > >>>> e = a.map()> 
> > >>>>> 
> > >>>> In the above case, if cache() is lazily evaluated, b, c, d and e are> 
> > >> all> 
> > >>>> going to be reading from the original DAG that generates a. But with 
> > >>>> a> 
> > >>>> naive expectation, d should be reading from the cache. This seems not> 
> > >>>> solving the potential confusion you raised, right?> 
> > >>>>> 
> > >>>> Just to be clear, my understanding are all based on the assumption> 
> > that> 
> > >>> the> 
> > >>>> tables are immutable. Therefore, after a.cache(), a the 
> > >>>> c*achedTableA*> 
> > >>> and> 
> > >>>> original table *a * should be completely interchangeable.> 
> > >>>>> 
> > >>>> That said, I think a valid argument is optimization. There are indeed> 
> > >>> cases> 
> > >>>> that reading from the original DAG could be faster than reading from> 
> > >> the> 
> > >>>> cache. For example, in the following example:> 
> > >>>>> 
> > >>>> a.filter(f1' > 100)> 
> > >>>> a.cache()> 
> > >>>> b = a.filter(f1' < 100)> 
> > >>>>> 
> > >>>> Ideally the optimizer should be intelligent enough to decide which 
> > >>>> way> 
> > >> is> 
> > >>>> faster, without user intervention. In this case, it will identify 
> > >>>> that> 
> > >> b> 
> > >>>> would just be an empty table, thus skip reading from the cache> 
> > >>> completely.> 
> > >>>> But I agree that returning a CachedTable would give user the control> 
> > of> 
> > >>>> when to use cache, even though I still feel that letting the 
> > >>>> optimizer> 
> > >>>> handle this is a better option in long run.> 
> > >>>>> 
> > >>>> Thanks,> 
> > >>>>> 
> > >>>> Jiangjie (Becket) Qin> 
> > >>>>> 
> > >>>>> 
> > >>>>> 
> > >>>>> 
> > >>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann <tr...@apache.org>> 
> > >>> wrote:> 
> > >>>>> 
> > >>>>> Yes you are right Becket that it still depends on the actual> 
> > >> execution> 
> > >>> of> 
> > >>>>> the job whether a consumer reads from a cached result or not.> 
> > >>>>>> 
> > >>>>> My point was actually about the properties of a (cached vs.> 
> > >> non-cached)> 
> > >>>> and> 
> > >>>>> not about the execution. I would not make cache trigger the 
> > >>>>> execution> 
> > >>> of> 
> > >>>>> the job because one loses some flexibility by eagerly triggering the> 
> > >>>>> execution.> 
> > >>>>>> 
> > >>>>> I tried to argue for an explicit CachedTable which is returned by 
> > >>>>> the> 
> > >>>>> cache() method like Piotr did in order to make the API more 
> > >>>>> explicit.> 
> > >>>>>> 
> > >>>>> Cheers,> 
> > >>>>> Till> 
> > >>>>>> 
> > >>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin <be...@gmail.com>> 
> > >>> wrote:> 
> > >>>>>> 
> > >>>>>> Hi Till,> 
> > >>>>>>> 
> > >>>>>> That is a good example. Just a minor correction, in this case, b, c> 
> > >>>> and d> 
> > >>>>>> will all consume from a non-cached a. This is because cache will> 
> > >> only> 
> > >>>> be> 
> > >>>>>> created on the very first job submission that generates the table> 
> > >> to> 
> > >>> be> 
> > >>>>>> cached.> 
> > >>>>>>> 
> > >>>>>> If I understand correctly, this is example is about whether> 
> > >> .cache()> 
> > >>>>> method> 
> > >>>>>> should be eagerly evaluated or lazily evaluated. In another word,> 
> > >> if> 
> > >>>>>> cache() method actually triggers a job that creates the cache,> 
> > >> there> 
> > >>>> will> 
> > >>>>>> be no such confusion. Is that right?> 
> > >>>>>>> 
> > >>>>>> In the example, although d will not consume from the cached Table> 
> > >>> while> 
> > >>>>> it> 
> > >>>>>> looks supposed to, from correctness perspective the code will still> 
> > >>>>> return> 
> > >>>>>> correct result, assuming that tables are immutable.> 
> > >>>>>>> 
> > >>>>>> Personally I feel it is OK because users probably won't really> 
> > >> worry> 
> > >>>>> about> 
> > >>>>>> whether the table is cached or not. And lazy cache could avoid some> 
> > >>>>>> unnecessary caching if a cached table is never created in the user> 
> > >>>>>> application. But I am not opposed to do eager evaluation of cache.> 
> > >>>>>>> 
> > >>>>>> Thanks,> 
> > >>>>>>> 
> > >>>>>> Jiangjie (Becket) Qin> 
> > >>>>>>> 
> > >>>>>>> 
> > >>>>>>> 
> > >>>>>> On Mon
[message truncated...]

Reply via email to