Just to clarify, when I say foo() like below, I assume that foo() must have a way to release its own cache, so it must have access to table env.
void foo(Table t) { ... t.cache(); // create cache for t ... env.getCacheService().releaseCacheFor(t); // release cache for t } Thanks, Jiangjie (Becket) Qin On Tue, Jan 8, 2019 at 9:04 PM Becket Qin <becket....@gmail.com> wrote: > Hi Piotr, > > I don't think it is feasible to ask every third party library to have > method signature with CacheService as an argument. > > And even that signature does not really solve the problem. Imagine > function foo() looks like following: > > void foo(Table t) { > ... > t.cache(); // create cache for t > ... > env.getCacheService().releaseCacheFor(t); // release cache for t > } > > From function foo()'s perspective, it created a cache and released it. > However, if someone invokes foo like this: > { > Table src = ... > Table t = src.select(...).cache() > foo(t) > // t is uncached by foo() already. > } > > So the "side effect" still exists. > > I think the only safe way to ensure there is no side effect while sharing > the cache is to use ref count. > > BTW, the discussion we are having here is exactly the reason that I prefer > option 3. From technical perspective option 3 solves all the concerns. > > Thanks, > > Jiangjie (Becket) Qin > > > On Tue, Jan 8, 2019 at 8:41 PM Piotr Nowojski <pi...@da-platform.com> > wrote: > >> Hi, >> >> I think that introducing ref counting could be confusing and it will be >> error prone, since Flink-table’s users are not used to closing/releasing >> resources. I was more objecting placing the >> `uncache()`/`dropCache()`/`releaseCache()` (releaseCache sounds best to me) >> as a method in the “Table”. It might be not obvious that it will drop the >> cache for all of the usages of the given table. For example: >> >> public void foo(Table t) { >> // … >> t.releaseCache(); >> } >> >> public void bar(Table t) { >> // ... >> } >> >> Table a = … >> val cachedA = a.cache() >> >> foo(cachedA) >> bar(cachedA) >> >> >> My problem with above example is that `t.releaseCache()` call is not >> doing the best possible job in communicating to the user that it will have >> a side effects for other places, like `bar(cachedA)` call. Something like >> this might be a better (not perfect, but just a bit better): >> >> public void foo(Table t, CacheService cacheService) { >> // … >> cacheService.releaseCacheFor(t); >> } >> >> Table a = … >> val cachedA = a.cache() >> >> foo(cachedA, env.getCacheService()) >> bar(cachedA) >> >> >> Also from another perspective, maybe placing `releaseCache()` method in >> Table might not be the best separation of concerns - `releaseCache()` >> method seams significantly different compared to other existing methods. >> >> Piotrek >> >> > On 8 Jan 2019, at 12:28, Becket Qin <becket....@gmail.com> wrote: >> > >> > Hi Piotr, >> > >> > You are right. There might be two intuitive meanings when users call >> > 'a.uncache()', namely: >> > 1. release the resource >> > 2. Do not use cache for the next operation. >> > >> > Case (1) would likely be the dominant use case. So I would suggest we >> > dedicate uncache() method to case (1), i.e. for resource release, but >> not >> > for ignoring cache. >> > >> > For case 2, i.e. explicitly ignoring cache (which is rare), users may >> use >> > something like 'hint("ignoreCache")'. I think this is better as it is a >> > little weird for users to call `a.uncache()` while they may not even >> know >> > if the table is cached at all. >> > >> > Assuming we let `uncache()` to only release resource, one possibility is >> > using ref count to mitigate the side effect. That means a ref count is >> > incremented on `cache()` and decremented on `uncache()`. That means >> > `uncache()` does not physically release the resource immediately, but >> just >> > means the cache could be released. >> > That being said, I am not sure if this is really a better solution as it >> > seems a little counter intuitive. Maybe calling it releaseCache() help a >> > little bit? >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > >> > >> > >> > On Tue, Jan 8, 2019 at 5:36 PM Piotr Nowojski <pi...@da-platform.com> >> wrote: >> > >> >> Hi Becket, >> >> >> >> With `uncache` there are probably two features that we can think about: >> >> >> >> a) >> >> >> >> Physically dropping the cached table from the storage, freeing up the >> >> resources >> >> >> >> b) >> >> >> >> Hinting the optimizer to not cache the reads for the next query/table >> >> >> >> a) Has the issue as I wrote before, that it seemed to be an operation >> >> inherently “flawed" with having side effects. >> >> >> >> I’m not sure how it would be best to express. We could make it work: >> >> >> >> 1. via a method on a Table as you proposed: >> >> >> >> void Table#dropCache() >> >> void Table#uncache() >> >> >> >> 2. Operation on the environment >> >> >> >> env.dropCacheFor(table) // or some other argument that allows user to >> >> identify the desired cache >> >> >> >> 3. Extending (from your original design doc) `setTableService` method >> to >> >> return some control handle like: >> >> >> >> TableServiceControl setTableService(TableFactory tf, >> >> TableProperties properties, >> >> TempTableCleanUpCallback cleanUpCallback); >> >> >> >> (TableServiceControl? TableService? TableServiceHandle? CacheService?) >> >> >> >> And having the drop cache method there: >> >> >> >> TableServiceControl#dropCache(table) >> >> >> >> Out of those options, option 1 might have a disadvantage of kind of not >> >> making the user aware, that this is a global operation with side >> effects. >> >> Like the old example of: >> >> >> >> public void foo(Table t) { >> >> // … >> >> t.dropCache(); >> >> } >> >> >> >> It might not be immediately obvious that `t.dropCache()` is some kind >> of >> >> global operation, with side effects visible outside of the `foo` >> function. >> >> >> >> On the other hand, both option 2 and 3, might have greater chance of >> >> catching user’s attention: >> >> >> >> public void foo(Table t, CacheService cacheService) { >> >> // … >> >> cacheService.dropCache(t); >> >> } >> >> >> >> b) could be achieved quite easily: >> >> >> >> Table a = … >> >> val notCached1 = a.doNotCache() >> >> val cachedA = a.cache() >> >> val notCached2 = cachedA.doNotCache() // equivalent of notCached1 >> >> >> >> `doNotCache()` would behave similarly to `cache()` - return a copy of >> the >> >> table with removed “cache” hint and/or added “never cache” hint. >> >> >> >> Piotrek >> >> >> >> >> >>> On 8 Jan 2019, at 03:17, Becket Qin <becket....@gmail.com> wrote: >> >>> >> >>> Hi Piotr, >> >>> >> >>> Thanks for the proposal and detailed explanation. I like the idea of >> >>> returning a new hinted Table without modifying the original table. >> This >> >>> also leave the room for users to benefit from future implicit caching. >> >>> >> >>> Just to make sure I get the full picture. In your proposal, there will >> >> also >> >>> be a 'void Table#uncache()' method to release the cache, right? >> >>> >> >>> Thanks, >> >>> >> >>> Jiangjie (Becket) Qin >> >>> >> >>> On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski <pi...@da-platform.com >> > >> >>> wrote: >> >>> >> >>>> Hi Becket! >> >>>> >> >>>> After further thinking I tend to agree that my previous proposal >> >> (*Option >> >>>> 2*) indeed might not be if would in the future introduce automatic >> >> caching. >> >>>> However I would like to propose a slightly modified version of it: >> >>>> >> >>>> *Option 4* >> >>>> >> >>>> Adding `cache()` method with following signature: >> >>>> >> >>>> Table Table#cache(); >> >>>> >> >>>> Without side-effects, and `cache()` call do not modify/change >> original >> >>>> Table in any way. >> >>>> It would return a copy of original table, with added hint for the >> >>>> optimizer to cache the table, so that the future accesses to the >> >> returned >> >>>> table might be cached or not. >> >>>> >> >>>> Assuming that we are talking about a setup, where we do not have >> >> automatic >> >>>> caching enabled (possible future extension). >> >>>> >> >>>> Example #1: >> >>>> >> >>>> ``` >> >>>> Table a = … >> >>>> a.foo() // not cached >> >>>> >> >>>> val cachedTable = a.cache(); >> >>>> >> >>>> cachedA.bar() // maybe cached >> >>>> a.foo() // same as before - effectively not cached >> >>>> ``` >> >>>> >> >>>> Both the first and the second `a.foo()` operations would behave in >> the >> >>>> exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. >> If >> >> `a` >> >>>> was not hinted for caching before `a.cache();`, then both `a.foo()` >> >> calls >> >>>> wouldn’t use cache. >> >>>> >> >>>> Returned `cachedA` would be hinted with “cache” hint, so probably >> >>>> `cachedA.bar()` would go through cache (unless optimiser decides the >> >>>> opposite) >> >>>> >> >>>> Example #2 >> >>>> >> >>>> ``` >> >>>> Table a = … >> >>>> >> >>>> a.foo() // not cached >> >>>> >> >>>> val b = a.cache(); >> >>>> >> >>>> a.foo() // same as before - effectively not cached >> >>>> b.foo() // maybe cached >> >>>> >> >>>> val c = b.cache(); >> >>>> >> >>>> a.foo() // same as before - effectively not cached >> >>>> b.foo() // same as before - effectively maybe cached >> >>>> c.foo() // maybe cached >> >>>> ``` >> >>>> >> >>>> Now, assuming that we have some future “automatic caching >> optimisation”: >> >>>> >> >>>> Example #3 >> >>>> >> >>>> ``` >> >>>> env.enableAutomaticCaching() >> >>>> Table a = … >> >>>> >> >>>> a.foo() // might be cached, depending if `a` was selected to >> automatic >> >>>> caching >> >>>> >> >>>> val b = a.cache(); >> >>>> >> >>>> a.foo() // same as before - might be cached, if `a` was selected to >> >>>> automatic caching >> >>>> b.foo() // maybe cached >> >>>> ``` >> >>>> >> >>>> >> >>>> More or less this is the same behaviour as: >> >>>> >> >>>> Table a = ... >> >>>> val b = a.filter(x > 20) >> >>>> >> >>>> calling `filter` hasn’t changed or altered `a` in anyway. If `a` was >> >>>> previously filtered: >> >>>> >> >>>> Table src = … >> >>>> val a = src.filter(x > 20) >> >>>> val b = a.filter(x > 20) >> >>>> >> >>>> then yes, `a` and `b` will be the same. But the point is that neither >> >>>> `filter` nor `cache` changes the original `a` table. >> >>>> >> >>>> One thing is that indeed, physically dropping cache operation, will >> have >> >>>> side effects and it will in a way mutate the cached table references. >> >> But >> >>>> this is I think unavoidable in any solution - the same issue as >> calling >> >>>> `.close()`, or calling destructor in C++. >> >>>> >> >>>> Piotrek >> >>>> >> >>>>> On 7 Jan 2019, at 10:41, Becket Qin <becket....@gmail.com> wrote: >> >>>>> >> >>>>> Happy New Year, everybody! >> >>>>> >> >>>>> I would like to resume this discussion thread. At this point, We >> have >> >>>>> agreed on the first step goal of interactive programming. The open >> >>>>> discussion is the exact API. More specifically, what should >> *cache()* >> >>>>> method return and what is the semantic. There are three options: >> >>>>> >> >>>>> *Option 1* >> >>>>> *void cache()* OR *Table cache()* which returns the original table >> for >> >>>>> chained calls. >> >>>>> *void uncache() *releases the cache. >> >>>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo(). >> >>>>> >> >>>>> - Semantic: a.cache() hints that table 'a' should be cached. >> Optimizer >> >>>>> decides whether the cache will be used or not. >> >>>>> - pros: simple and no confusion between CachedTable and original >> table >> >>>>> - cons: A table may be cached / uncached in a method invocation, >> while >> >>>> the >> >>>>> caller does not know about this. >> >>>>> >> >>>>> *Option 2* >> >>>>> *CachedTable cache()* >> >>>>> *CachedTable *extends *Table *with an additional *uncache()* method >> >>>>> >> >>>>> - Semantic: After *val cachedA = a.cache()*, *cachedA.foo()* will >> >> always >> >>>>> use cache. *a.bar() *will always use original DAG. >> >>>>> - pros: No potential side effects in method invocation. >> >>>>> - cons: Optimizer has no chance to kick in. Future optimization will >> >>>> become >> >>>>> a behavior change and need users to change the code. >> >>>>> >> >>>>> *Option 3* >> >>>>> *CacheHandle cache()* >> >>>>> *CacheHandle.release() *to release a cache handle on the table. If >> all >> >>>>> cache handles are released, the cache could be removed. >> >>>>> *Table.hint(ignoreCache).foo()* to ignore cache for operation foo(). >> >>>>> >> >>>>> - Semantic: *a.cache() *hints that 'a' should be cached. Optimizer >> >>>> decides >> >>>>> whether the cache will be used or not. Cache is released either no >> >> handle >> >>>>> is on it, or the user program exits. >> >>>>> - pros: No potential side effect in method invocation. No confusion >> >>>> between >> >>>>> cached table v.s original table. >> >>>>> - cons: An additional CacheHandle exposed to the users. >> >>>>> >> >>>>> >> >>>>> Personally I prefer option 3 for the following reasons: >> >>>>> 1. It is simple. Vast majority of the users would just call >> >>>>> *a.cache()* followed >> >>>>> by *a.foo(),* *a.bar(), etc. * >> >>>>> 2. There is no semantic ambiguity and semantic change if we decide >> to >> >> add >> >>>>> implicit cache in the future. >> >>>>> 3. There is no side effect in the method calls. >> >>>>> 4. Admittedly we need to expose one more CacheHandle class to the >> >> users. >> >>>>> But it is not that difficult to understand given similar well known >> >>>> concept >> >>>>> like ref count (we can name it CacheReference if that is easier to >> >>>>> understand). So I think it is fine. >> >>>>> >> >>>>> >> >>>>> Thanks, >> >>>>> >> >>>>> Jiangjie (Becket) Qin >> >>>>> >> >>>>> >> >>>>> On Thu, Dec 13, 2018 at 11:23 AM Becket Qin <becket....@gmail.com> >> >>>> wrote: >> >>>>> >> >>>>>> Hi Piotrek, >> >>>>>> >> >>>>>> 1. Regarding optimization. >> >>>>>> Sure there are many cases that the decision is hard to make. But >> that >> >>>> does >> >>>>>> not make it any easier for the users to make those decisions. I >> >> imagine >> >>>> 99% >> >>>>>> of the users would just naively use cache. I am not saying we can >> >>>> optimize >> >>>>>> in all the cases. But as long as we agree that at least in certain >> >>>> cases (I >> >>>>>> would argue most cases), optimizer can do a little better than an >> >>>> average >> >>>>>> user who likely knows little about Flink internals, we should not >> push >> >>>> the >> >>>>>> burden of optimization to users. >> >>>>>> >> >>>>>> BTW, it seems some of your concerns are related to the >> >> implementation. I >> >>>>>> did not mention the implementation of the caching service because >> that >> >>>>>> should not affect the API semantic. Not sure if this helps, but >> >> imagine >> >>>> the >> >>>>>> default implementation has one StorageNode service colocating with >> >> each >> >>>> TM. >> >>>>>> It could be running within the TM process or in a standalone >> process, >> >>>>>> depending on configuration. >> >>>>>> >> >>>>>> The StorageNode uses memory + spill-to-disk mechanism. The cached >> data >> >>>>>> will just be written to the local StorageNode service. If the >> >>>> StorageNode >> >>>>>> is running within the TM process, the in-memory cache could just be >> >>>> objects >> >>>>>> so we save some serde cost. A later job referring to the cached >> Table >> >>>> will >> >>>>>> be scheduled in a locality aware manner, i.e. run in the TM whose >> peer >> >>>>>> StorageNode hosts the data. >> >>>>>> >> >>>>>> >> >>>>>> 2. Semantic >> >>>>>> I am not sure why introducing a new hintCache() or >> >>>>>> env.enableAutomaticCaching() method would avoid the consequence of >> >>>> semantic >> >>>>>> change. >> >>>>>> >> >>>>>> If the auto optimization is not enabled by default, users still >> need >> >> to >> >>>>>> make code change to all existing programs in order to get the >> benefit. >> >>>>>> If the auto optimization is enabled by default, advanced users who >> >> know >> >>>>>> that they really want to use cache will suddenly lose the >> opportunity >> >>>> to do >> >>>>>> so, unless they change the code to disable auto optimization. >> >>>>>> >> >>>>>> >> >>>>>> 3. side effect >> >>>>>> The CacheHandle is not only for where to put uncache(). It is to >> solve >> >>>> the >> >>>>>> implicit performance impact by moving the uncache() to the >> >> CacheHandle. >> >>>>>> >> >>>>>> - If users wants to leverage cache, they can call a.cache(). After >> >>>>>> that, unless user explicitly release that CacheHandle, a.foo() will >> >>>> always >> >>>>>> leverage cache if needed (optimizer may choose to ignore cache if >> >> that >> >>>>>> helps accelerate the process). Any function call will not be able >> to >> >>>>>> release the cache because they do not have that CacheHandle. >> >>>>>> - If some advanced users do not want to use cache at all, they will >> >>>>>> call a.hint(ignoreCache).foo(). This will for sure ignore cache and >> >>>> use the >> >>>>>> original DAG to process. >> >>>>>> >> >>>>>> >> >>>>>>> In vast majority of the cases, users wouldn't really care whether >> the >> >>>>>>> cache is used or not. >> >>>>>>> I wouldn’t agree with that, because “caching” (if not purely in >> >> memory >> >>>>>>> caching) would add additional IO costs. It’s similar as saying >> that >> >>>> users >> >>>>>>> would not see a difference between Spark/Flink and MapReduce >> >> (MapReduce >> >>>>>>> writes data to disks after every map/reduce stage). >> >>>>>> >> >>>>>> What I wanted to say is that in most cases, after users call >> cache(), >> >>>> they >> >>>>>> don't really care about whether auto optimization has decided to >> >> ignore >> >>>> the >> >>>>>> cache or not, as long as the program runs faster. >> >>>>>> >> >>>>>> Thanks, >> >>>>>> >> >>>>>> Jiangjie (Becket) Qin >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> On Wed, Dec 12, 2018 at 10:50 PM Piotr Nowojski < >> >>>> pi...@data-artisans.com> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> Hi, >> >>>>>>> >> >>>>>>> Thanks for the quick answer :) >> >>>>>>> >> >>>>>>> Re 1. >> >>>>>>> >> >>>>>>> I generally agree with you, however couple of points: >> >>>>>>> >> >>>>>>> a) the problem with using automatic caching is bigger, because you >> >> will >> >>>>>>> have to decide, how do you compare IO vs CPU costs and if you pick >> >>>> wrong, >> >>>>>>> additional IO costs might be enormous or even can crash your >> system. >> >>>> This >> >>>>>>> is more difficult problem compared to let say join reordering, >> where >> >>>> the >> >>>>>>> only issue is to have good statistics that can capture >> correlations >> >>>> between >> >>>>>>> columns (when you reorder joins number of IO operations do not >> >> change) >> >>>>>>> c) your example is completely independent of caching. >> >>>>>>> >> >>>>>>> Query like this: >> >>>>>>> >> >>>>>>> src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 >> ===`f2).as('f3, >> >>>>>>> …).filter(‘f3 > 30) >> >>>>>>> >> >>>>>>> Should/could be optimised to empty result immediately, without the >> >> need >> >>>>>>> for any cache/materialisation and that should work even without >> any >> >>>>>>> statistics provided by the connector. >> >>>>>>> >> >>>>>>> For me prerequisite to any serious cost-based optimisations would >> be >> >>>> some >> >>>>>>> reasonable benchmark coverage of the code (tpch?). Otherwise that >> >>>> would be >> >>>>>>> equivalent of adding not tested code, since we wouldn’t be able to >> >>>> verify >> >>>>>>> our assumptions, like how does the writing of 10 000 records to >> >>>>>>> cache/RocksDB/Kafka/CSV file compare to >> joining/filtering/processing >> >> of >> >>>>>>> lets say 1000 000 rows. >> >>>>>>> >> >>>>>>> Re 2. >> >>>>>>> >> >>>>>>> I wasn’t proposing to change the semantic later. I was proposing >> that >> >>>> we >> >>>>>>> start now: >> >>>>>>> >> >>>>>>> CachedTable cachedA = a.cache() >> >>>>>>> cachedA.foo() // Cache is used >> >>>>>>> a.bar() // Original DAG is used >> >>>>>>> >> >>>>>>> And then later we can think about adding for example >> >>>>>>> >> >>>>>>> CachedTable cachedA = a.hintCache() >> >>>>>>> cachedA.foo() // Cache might be used >> >>>>>>> a.bar() // Original DAG is used >> >>>>>>> >> >>>>>>> Or >> >>>>>>> >> >>>>>>> env.enableAutomaticCaching() >> >>>>>>> a.foo() // Cache might be used >> >>>>>>> a.bar() // Cache might be used >> >>>>>>> >> >>>>>>> Or (I would still not like this option): >> >>>>>>> >> >>>>>>> a.hintCache() >> >>>>>>> a.foo() // Cache might be used >> >>>>>>> a.bar() // Cache might be used >> >>>>>>> >> >>>>>>> Or whatever else that will come to our mind. Even if we add some >> >>>>>>> automatic caching in the future, keeping implicit (`CachedTable >> >>>> cache()`) >> >>>>>>> caching will still be useful, at least in some cases. >> >>>>>>> >> >>>>>>> Re 3. >> >>>>>>> >> >>>>>>>> 2. The source tables are immutable during one run of batch >> >> processing >> >>>>>>> logic. >> >>>>>>>> 3. The cache is immutable during one run of batch processing >> logic. >> >>>>>>> >> >>>>>>>> I think assumption 2 and 3 are by definition what batch >> processing >> >>>>>>> means, >> >>>>>>>> i.e the data must be complete before it is processed and should >> not >> >>>>>>> change >> >>>>>>>> when the processing is running. >> >>>>>>> >> >>>>>>> I agree that this is how batch systems SHOULD be working. However >> I >> >>>> know >> >>>>>>> from my previous experience that it’s not always the case. >> Sometimes >> >>>> users >> >>>>>>> are just working on some non transactional storage, which can be >> >>>> (either >> >>>>>>> constantly or occasionally) being modified by some other processes >> >> for >> >>>>>>> whatever the reasons (fixing the data, updating, adding new data >> >> etc). >> >>>>>>> >> >>>>>>> But even if we ignore this point (data immutability), performance >> >> side >> >>>>>>> effect issue of your proposal remains. If user calls `void >> a.cache()` >> >>>> deep >> >>>>>>> inside some private method, it will have implicit side effects on >> >> other >> >>>>>>> parts of his program that might not be obvious. >> >>>>>>> >> >>>>>>> Re `CacheHandle`. >> >>>>>>> >> >>>>>>> If I understand it correctly, it only addresses the issue where to >> >>>> place >> >>>>>>> method `uncache`/`dropCache`. >> >>>>>>> >> >>>>>>> Btw, >> >>>>>>> >> >>>>>>>> In vast majority of the cases, users wouldn't really care whether >> >> the >> >>>>>>> cache is used or not. >> >>>>>>> >> >>>>>>> I wouldn’t agree with that, because “caching” (if not purely in >> >> memory >> >>>>>>> caching) would add additional IO costs. It’s similar as saying >> that >> >>>> users >> >>>>>>> would not see a difference between Spark/Flink and MapReduce >> >> (MapReduce >> >>>>>>> writes data to disks after every map/reduce stage). >> >>>>>>> >> >>>>>>> Piotrek >> >>>>>>> >> >>>>>>>> On 12 Dec 2018, at 14:28, Becket Qin <becket....@gmail.com> >> wrote: >> >>>>>>>> >> >>>>>>>> Hi Piotrek, >> >>>>>>>> >> >>>>>>>> Not sure if you noticed, in my last email, I was proposing >> >>>> `CacheHandle >> >>>>>>>> cache()` to avoid the potential side effect due to function >> calls. >> >>>>>>>> >> >>>>>>>> Let's look at the disagreement in your reply one by one. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> 1. Optimization chances >> >>>>>>>> >> >>>>>>>> Optimization is never a trivial work. This is exactly why we >> should >> >>>> not >> >>>>>>> let >> >>>>>>>> user manually do that. Databases have done huge amount of work in >> >> this >> >>>>>>>> area. At Alibaba, we rely heavily on many optimization rules to >> >> boost >> >>>>>>> the >> >>>>>>>> SQL query performance. >> >>>>>>>> >> >>>>>>>> In your example, if I filling the filter conditions in a certain >> >> way, >> >>>>>>> the >> >>>>>>>> optimization would become obvious. >> >>>>>>>> >> >>>>>>>> Table src1 = … // read from connector 1 >> >>>>>>>> Table src2 = … // read from connector 2 >> >>>>>>>> >> >>>>>>>> Table a = src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 >> === >> >>>>>>>> `f2).as('f3, ...) >> >>>>>>>> a.cache() // write cache to connector 3, when writing the >> records, >> >>>>>>> remember >> >>>>>>>> min and max of `f1 >> >>>>>>>> >> >>>>>>>> a.filter('f3 > 30) // There is no need to read from any connector >> >>>>>>> because >> >>>>>>>> `a` does not contain any record whose 'f3 is greater than 30. >> >>>>>>>> env.execute() >> >>>>>>>> a.select(…) >> >>>>>>>> >> >>>>>>>> BTW, it seems to me that adding some basic statistics is fairly >> >>>>>>>> straightforward and the cost is pretty marginal if not >> ignorable. In >> >>>>>>> fact >> >>>>>>>> it is not only needed for optimization, but also for cases such >> as >> >> ML, >> >>>>>>>> where some algorithms may need to decide their parameter based on >> >> the >> >>>>>>>> statistics of the data. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> 2. Same API, one semantic now, another semantic later. >> >>>>>>>> >> >>>>>>>> I am trying to understand what is the semantic of `CachedTable >> >>>> cache()` >> >>>>>>> you >> >>>>>>>> are proposing. IMO, we should avoid designing an API whose >> semantic >> >>>>>>> will be >> >>>>>>>> changed later. If we have a "CachedTable cache()" method, then >> the >> >>>>>>> semantic >> >>>>>>>> should be very clearly defined upfront and do not change later. >> It >> >>>>>>> should >> >>>>>>>> never be "right now let's go with semantic 1, later we can >> silently >> >>>>>>> change >> >>>>>>>> it to semantic 2 or 3". Such change could result in bad >> consequence. >> >>>> For >> >>>>>>>> example, let's say we decide go with semantic 1: >> >>>>>>>> >> >>>>>>>> CachedTable cachedA = a.cache() >> >>>>>>>> cachedA.foo() // Cache is used >> >>>>>>>> a.bar() // Original DAG is used. >> >>>>>>>> >> >>>>>>>> Now majority of the users would be using cachedA.foo() in their >> >> code. >> >>>>>>> And >> >>>>>>>> some advanced users will use a.bar() to explicitly skip the >> cache. >> >>>> Later >> >>>>>>>> on, we added smart optimization and change the semantic to >> semantic >> >> 2: >> >>>>>>>> >> >>>>>>>> CachedTable cachedA = a.cache() >> >>>>>>>> cachedA.foo() // Cache is used >> >>>>>>>> a.bar() // Cache MIGHT be used, and Flink may decide to skip >> cache >> >> if >> >>>>>>> it is >> >>>>>>>> faster. >> >>>>>>>> >> >>>>>>>> Now most of the users who were writing cachedA.foo() will not >> >> benefit >> >>>>>>> from >> >>>>>>>> this optimization at all, unless they change their code to use >> >> a.foo() >> >>>>>>>> instead. And those advanced users suddenly lose the option to >> >>>> explicitly >> >>>>>>>> ignore cache unless they change their code (assuming we care >> enough >> >> to >> >>>>>>>> provide something like hint(useCache)). If we don't define the >> >>>> semantic >> >>>>>>>> carefully, our users will have to change their code again and >> again >> >>>>>>> while >> >>>>>>>> they shouldn't have to. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> 3. side effect. >> >>>>>>>> >> >>>>>>>> Before we talk about side effect, we have to agree on the >> >> assumptions. >> >>>>>>> The >> >>>>>>>> assumptions I have are following: >> >>>>>>>> 1. We are talking about batch processing. >> >>>>>>>> 2. The source tables are immutable during one run of batch >> >> processing >> >>>>>>> logic. >> >>>>>>>> 3. The cache is immutable during one run of batch processing >> logic. >> >>>>>>>> >> >>>>>>>> I think assumption 2 and 3 are by definition what batch >> processing >> >>>>>>> means, >> >>>>>>>> i.e the data must be complete before it is processed and should >> not >> >>>>>>> change >> >>>>>>>> when the processing is running. >> >>>>>>>> >> >>>>>>>> As far as I am aware of, I don't know any batch processing system >> >>>>>>> breaking >> >>>>>>>> those assumptions. Even for relational database tables, where >> >> queries >> >>>>>>> can >> >>>>>>>> run with concurrent modifications, necessary locking are still >> >>>> required >> >>>>>>> to >> >>>>>>>> ensure the integrity of the query result. >> >>>>>>>> >> >>>>>>>> Please let me know if you disagree with the above assumptions. If >> >> you >> >>>>>>> agree >> >>>>>>>> with these assumptions, with the `CacheHandle cache()` API in my >> >> last >> >>>>>>>> email, do you still see side effects? >> >>>>>>>> >> >>>>>>>> Thanks, >> >>>>>>>> >> >>>>>>>> Jiangjie (Becket) Qin >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Wed, Dec 12, 2018 at 7:11 PM Piotr Nowojski < >> >>>> pi...@data-artisans.com >> >>>>>>>> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Hi Becket, >> >>>>>>>>> >> >>>>>>>>>> Regarding the chance of optimization, it might not be that >> rare. >> >>>> Some >> >>>>>>>>> very >> >>>>>>>>>> simple statistics could already help in many cases. For >> example, >> >>>>>>> simply >> >>>>>>>>>> maintaining max and min of each fields can already eliminate >> some >> >>>>>>>>>> unnecessary table scan (potentially scanning the cached table) >> if >> >>>> the >> >>>>>>>>>> result is doomed to be empty. A histogram would give even >> further >> >>>>>>>>>> information. The optimizer could be very careful and only >> ignores >> >>>>>>> cache >> >>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a >> >> filter >> >>>> on >> >>>>>>>>> the >> >>>>>>>>>> cache will absolutely return nothing. >> >>>>>>>>> >> >>>>>>>>> I do not see how this might be easy to achieve. It would require >> >> tons >> >>>>>>> of >> >>>>>>>>> effort to make it work and in the end you would still have a >> >> problem >> >>>> of >> >>>>>>>>> comparing/trading CPU cycles vs IO. For example: >> >>>>>>>>> >> >>>>>>>>> Table src1 = … // read from connector 1 >> >>>>>>>>> Table src2 = … // read from connector 2 >> >>>>>>>>> >> >>>>>>>>> Table a = src1.filter(…).join(src2.filter(…), …) >> >>>>>>>>> a.cache() // write cache to connector 3 >> >>>>>>>>> >> >>>>>>>>> a.filter(…) >> >>>>>>>>> env.execute() >> >>>>>>>>> a.select(…) >> >>>>>>>>> >> >>>>>>>>> Decision whether it’s better to: >> >>>>>>>>> A) read from connector1/connector2, filter/map and join them >> twice >> >>>>>>>>> B) read from connector1/connector2, filter/map and join them >> once, >> >>>> pay >> >>>>>>> the >> >>>>>>>>> price of writing to connector 3 and then reading from it >> >>>>>>>>> >> >>>>>>>>> Is very far from trivial. `a` can end up much larger than `src1` >> >> and >> >>>>>>>>> `src2`, writes to connector 3 might be extremely slow, reads >> from >> >>>>>>> connector >> >>>>>>>>> 3 can be slower compared to reads from connector 1 & 2, … . You >> >>>> really >> >>>>>>> need >> >>>>>>>>> to have extremely good statistics to correctly asses size of the >> >>>>>>> output and >> >>>>>>>>> it would still be failing many times (correlations etc). And >> keep >> >> in >> >>>>>>> mind >> >>>>>>>>> that at the moment we do not have ANY statistics at all. More >> than >> >>>>>>> that, it >> >>>>>>>>> would require significantly more testing and setting up some >> >>>>>>> benchmarks to >> >>>>>>>>> make sure that we do not brake it with some regressions. >> >>>>>>>>> >> >>>>>>>>> That’s why I’m strongly opposing this idea - at least let’s not >> >>>> starts >> >>>>>>>>> with this. If we first start with completely manual/explicit >> >> caching, >> >>>>>>>>> without any magic, it would be a significant improvement for the >> >>>> users >> >>>>>>> for >> >>>>>>>>> a fraction of the development cost. After implementing that, >> when >> >> we >> >>>>>>>>> already have all of the working pieces, we can start working on >> >> some >> >>>>>>>>> optimisations rules. As I wrote before, if we start with >> >>>>>>>>> >> >>>>>>>>> `CachedTable cache()` >> >>>>>>>>> >> >>>>>>>>> We can later work on follow up stories to make it automatic. >> >> Despite >> >>>>>>> that >> >>>>>>>>> I don’t like this implicit/side effect approach with `void` >> method, >> >>>>>>> having >> >>>>>>>>> explicit `CachedTable cache()` wouldn’t even prevent as from >> later >> >>>>>>> adding >> >>>>>>>>> `void hintCache()` method, with the exact semantic that you >> want. >> >>>>>>>>> >> >>>>>>>>> On top of that I re-rise again that having implicit `void >> >>>>>>>>> cache()/hintCache()` has other side effects and problems with >> non >> >>>>>>> immutable >> >>>>>>>>> data, and being annoying when used secretly inside methods. >> >>>>>>>>> >> >>>>>>>>> Explicit `CachedTable cache()` just looks like much less >> >>>> controversial >> >>>>>>> MVP >> >>>>>>>>> and if we decide to go further with this topic, it’s not a >> wasted >> >>>>>>> effort, >> >>>>>>>>> but just lies on a stright path to more advanced/complicated >> >>>> solutions >> >>>>>>> in >> >>>>>>>>> the future. Are there any drawbacks of starting with >> `CachedTable >> >>>>>>> cache()` >> >>>>>>>>> that I’m missing? >> >>>>>>>>> >> >>>>>>>>> Piotrek >> >>>>>>>>> >> >>>>>>>>>> On 12 Dec 2018, at 09:30, Jeff Zhang <zjf...@gmail.com> wrote: >> >>>>>>>>>> >> >>>>>>>>>> Hi Becket, >> >>>>>>>>>> >> >>>>>>>>>> Introducing CacheHandle seems too complicated. That means users >> >> have >> >>>>>>> to >> >>>>>>>>>> maintain Handler properly. >> >>>>>>>>>> >> >>>>>>>>>> And since cache is just a hint for optimizer, why not just >> return >> >>>>>>> Table >> >>>>>>>>>> itself for cache method. This hint info should be kept in >> Table I >> >>>>>>>>> believe. >> >>>>>>>>>> >> >>>>>>>>>> So how about adding method cache and uncache for Table, and >> both >> >>>>>>> return >> >>>>>>>>>> Table. Because what cache and uncache did is just adding some >> hint >> >>>>>>> info >> >>>>>>>>>> into Table. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道: >> >>>>>>>>>> >> >>>>>>>>>>> Hi Till and Piotrek, >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks for the clarification. That solves quite a few >> confusion. >> >> My >> >>>>>>>>>>> understanding of how cache works is same as what Till >> describe. >> >>>> i.e. >> >>>>>>>>>>> cache() is a hint to Flink, but it is not guaranteed that >> cache >> >>>>>>> always >> >>>>>>>>>>> exist and it might be recomputed from its lineage. >> >>>>>>>>>>> >> >>>>>>>>>>> Is this the core of our disagreement here? That you would like >> >> this >> >>>>>>>>>>>> “cache()” to be mostly hint for the optimiser? >> >>>>>>>>>>> >> >>>>>>>>>>> Semantic wise, yes. That's also why I think materialize() has >> a >> >>>> much >> >>>>>>>>> larger >> >>>>>>>>>>> scope than cache(), thus it should be a different method. >> >>>>>>>>>>> >> >>>>>>>>>>> Regarding the chance of optimization, it might not be that >> rare. >> >>>> Some >> >>>>>>>>> very >> >>>>>>>>>>> simple statistics could already help in many cases. For >> example, >> >>>>>>> simply >> >>>>>>>>>>> maintaining max and min of each fields can already eliminate >> some >> >>>>>>>>>>> unnecessary table scan (potentially scanning the cached >> table) if >> >>>> the >> >>>>>>>>>>> result is doomed to be empty. A histogram would give even >> further >> >>>>>>>>>>> information. The optimizer could be very careful and only >> ignores >> >>>>>>> cache >> >>>>>>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a >> >> filter >> >>>>>>> on >> >>>>>>>>> the >> >>>>>>>>>>> cache will absolutely return nothing. >> >>>>>>>>>>> >> >>>>>>>>>>> Given the above clarification on cache, I would like to >> revisit >> >> the >> >>>>>>>>>>> original "void cache()" proposal and see if we can improve on >> top >> >>>> of >> >>>>>>>>> that. >> >>>>>>>>>>> >> >>>>>>>>>>> What do you think about the following modified interface? >> >>>>>>>>>>> >> >>>>>>>>>>> Table { >> >>>>>>>>>>> /** >> >>>>>>>>>>> * This call hints Flink to maintain a cache of this table and >> >>>>>>> leverage >> >>>>>>>>>>> it for performance optimization if needed. >> >>>>>>>>>>> * Note that Flink may still decide to not use the cache if it >> is >> >>>>>>>>> cheaper >> >>>>>>>>>>> by doing so. >> >>>>>>>>>>> * >> >>>>>>>>>>> * A CacheHandle will be returned to allow user release the >> cache >> >>>>>>>>>>> actively. The cache will be deleted if there >> >>>>>>>>>>> * is no unreleased cache handlers to it. When the >> >> TableEnvironment >> >>>>>>> is >> >>>>>>>>>>> closed. The cache will also be deleted >> >>>>>>>>>>> * and all the cache handlers will be released. >> >>>>>>>>>>> * >> >>>>>>>>>>> * @return a CacheHandle referring to the cache of this table. >> >>>>>>>>>>> */ >> >>>>>>>>>>> CacheHandle cache(); >> >>>>>>>>>>> } >> >>>>>>>>>>> >> >>>>>>>>>>> CacheHandle { >> >>>>>>>>>>> /** >> >>>>>>>>>>> * Close the cache handle. This method does not necessarily >> >> deletes >> >>>>>>> the >> >>>>>>>>>>> cache. Instead, it simply decrements the reference counter to >> the >> >>>>>>> cache. >> >>>>>>>>>>> * When the there is no handle referring to a cache. The cache >> >> will >> >>>>>>> be >> >>>>>>>>>>> deleted. >> >>>>>>>>>>> * >> >>>>>>>>>>> * @return the number of open handles to the cache after this >> >> handle >> >>>>>>>>> has >> >>>>>>>>>>> been released. >> >>>>>>>>>>> */ >> >>>>>>>>>>> int release() >> >>>>>>>>>>> } >> >>>>>>>>>>> >> >>>>>>>>>>> The rationale behind this interface is following: >> >>>>>>>>>>> In vast majority of the cases, users wouldn't really care >> whether >> >>>> the >> >>>>>>>>> cache >> >>>>>>>>>>> is used or not. So I think the most intuitive way is letting >> >>>> cache() >> >>>>>>>>> return >> >>>>>>>>>>> nothing. So nobody needs to worry about the difference between >> >>>>>>>>> operations >> >>>>>>>>>>> on CacheTables and those on the "original" tables. This will >> make >> >>>>>>> maybe >> >>>>>>>>>>> 99.9% of the users happy. There were two concerns raised for >> this >> >>>>>>>>> approach: >> >>>>>>>>>>> 1. In some rare cases, users may want to ignore cache, >> >>>>>>>>>>> 2. A table might be cached/uncached in a third party function >> >> while >> >>>>>>> the >> >>>>>>>>>>> caller does not know. >> >>>>>>>>>>> >> >>>>>>>>>>> For the first issue, users can use hint("ignoreCache") to >> >>>> explicitly >> >>>>>>>>> ignore >> >>>>>>>>>>> cache. >> >>>>>>>>>>> For the second issue, the above proposal lets cache() return a >> >>>>>>>>> CacheHandle, >> >>>>>>>>>>> the only method in it is release(). Different CacheHandles >> will >> >>>>>>> refer to >> >>>>>>>>>>> the same cache, if a cache no longer has any cache handle, it >> >> will >> >>>> be >> >>>>>>>>>>> deleted. This will address the following case: >> >>>>>>>>>>> { >> >>>>>>>>>>> val handle1 = a.cache() >> >>>>>>>>>>> process(a) >> >>>>>>>>>>> a.select(...) // cache is still available, handle1 has not >> been >> >>>>>>>>> released. >> >>>>>>>>>>> } >> >>>>>>>>>>> >> >>>>>>>>>>> void process(Table t) { >> >>>>>>>>>>> val handle2 = t.cache() // new handle to cache >> >>>>>>>>>>> t.select(...) // optimizer decides cache usage >> >>>>>>>>>>> t.hint("ignoreCache").select(...) // cache is ignored >> >>>>>>>>>>> handle2.release() // release the handle, but the cache may >> still >> >> be >> >>>>>>>>>>> available if there are other handles >> >>>>>>>>>>> ... >> >>>>>>>>>>> } >> >>>>>>>>>>> >> >>>>>>>>>>> Does the above modified approach look reasonable to you? >> >>>>>>>>>>> >> >>>>>>>>>>> Cheers, >> >>>>>>>>>>> >> >>>>>>>>>>> Jiangjie (Becket) Qin >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann < >> >>>> trohrm...@apache.org> >> >>>>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>>> Hi Becket, >> >>>>>>>>>>>> >> >>>>>>>>>>>> I was aiming at semantics similar to 1. I actually thought >> that >> >>>>>>>>> `cache()` >> >>>>>>>>>>>> would tell the system to materialize the intermediate result >> so >> >>>> that >> >>>>>>>>>>>> subsequent queries don't need to reprocess it. This means >> that >> >> the >> >>>>>>>>> usage >> >>>>>>>>>>> of >> >>>>>>>>>>>> the cached table in this example >> >>>>>>>>>>>> >> >>>>>>>>>>>> { >> >>>>>>>>>>>> val cachedTable = a.cache() >> >>>>>>>>>>>> val b1 = cachedTable.select(…) >> >>>>>>>>>>>> val b2 = cachedTable.foo().select(…) >> >>>>>>>>>>>> val b3 = cachedTable.bar().select(...) >> >>>>>>>>>>>> val c1 = a.select(…) >> >>>>>>>>>>>> val c2 = a.foo().select(…) >> >>>>>>>>>>>> val c3 = a.bar().select(...) >> >>>>>>>>>>>> } >> >>>>>>>>>>>> >> >>>>>>>>>>>> strongly depends on interleaved calls which trigger the >> >> execution >> >>>> of >> >>>>>>>>> sub >> >>>>>>>>>>>> queries. So for example, if there is only a single >> env.execute >> >>>> call >> >>>>>>> at >> >>>>>>>>>>> the >> >>>>>>>>>>>> end of block, then b1, b2, b3, c1, c2 and c3 would all be >> >>>> computed >> >>>>>>> by >> >>>>>>>>>>>> reading directly from the sources (given that there is only a >> >>>> single >> >>>>>>>>>>>> JobGraph). It just happens that the result of `a` will be >> cached >> >>>>>>> such >> >>>>>>>>>>> that >> >>>>>>>>>>>> we skip the processing of `a` when there are subsequent >> queries >> >>>>>>> reading >> >>>>>>>>>>>> from `cachedTable`. If for some reason the system cannot >> >>>> materialize >> >>>>>>>>> the >> >>>>>>>>>>>> table (e.g. running out of disk space, ttl expired), then it >> >> could >> >>>>>>> also >> >>>>>>>>>>>> happen that we need to reprocess `a`. In that sense >> >> `cachedTable` >> >>>>>>>>> simply >> >>>>>>>>>>> is >> >>>>>>>>>>>> an identifier for the materialized result of `a` with the >> >> lineage >> >>>>>>> how >> >>>>>>>>> to >> >>>>>>>>>>>> reprocess it. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Cheers, >> >>>>>>>>>>>> Till >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski < >> >>>>>>>>> pi...@data-artisans.com >> >>>>>>>>>>>> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>>> Hi Becket, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> { >> >>>>>>>>>>>>>> val cachedTable = a.cache() >> >>>>>>>>>>>>>> val b = cachedTable.select(...) >> >>>>>>>>>>>>>> val c = a.select(...) >> >>>>>>>>>>>>>> } >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses >> >>>>>>> original >> >>>>>>>>>>> DAG >> >>>>>>>>>>>>> as >> >>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no >> chance to >> >>>>>>>>>>>> optimize. >> >>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c >> leaves >> >> the >> >>>>>>>>>>>>> optimizer >> >>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this >> >> case, >> >>>>>>> user >> >>>>>>>>>>>>> lose >> >>>>>>>>>>>>>> the option to NOT use cache. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> As you can see, neither of the options seem perfect. >> However, >> >> I >> >>>>>>> guess >> >>>>>>>>>>>> you >> >>>>>>>>>>>>>> and Till are proposing the third option: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache >> or >> >>>> DAG >> >>>>>>>>>>>> should >> >>>>>>>>>>>>> be >> >>>>>>>>>>>>>> used. c always use the DAG. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I am pretty sure that me, Till, Fabian and others were all >> >>>>>>> proposing >> >>>>>>>>>>> and >> >>>>>>>>>>>>> advocating in favour of semantic “1”. No cost based >> optimiser >> >>>>>>>>> decisions >> >>>>>>>>>>>> at >> >>>>>>>>>>>>> all. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> { >> >>>>>>>>>>>>> val cachedTable = a.cache() >> >>>>>>>>>>>>> val b1 = cachedTable.select(…) >> >>>>>>>>>>>>> val b2 = cachedTable.foo().select(…) >> >>>>>>>>>>>>> val b3 = cachedTable.bar().select(...) >> >>>>>>>>>>>>> val c1 = a.select(…) >> >>>>>>>>>>>>> val c2 = a.foo().select(…) >> >>>>>>>>>>>>> val c3 = a.bar().select(...) >> >>>>>>>>>>>>> } >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> All b1, b2 and b3 are reading from cache, while c1, c2 and >> c3 >> >> are >> >>>>>>>>>>>>> re-executing whole plan for “a”. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> In the future we could discuss going one step further, >> >>>> introducing >> >>>>>>>>> some >> >>>>>>>>>>>>> global optimisation (that can be manually enabled/disabled): >> >>>>>>>>>>> deduplicate >> >>>>>>>>>>>>> plan nodes/deduplicate sub queries/re-use sub queries >> >> results/or >> >>>>>>>>>>> whatever >> >>>>>>>>>>>>> we could call it. It could do two things: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> 1. Automatically try to deduplicate fragments of the plan >> and >> >>>> share >> >>>>>>>>> the >> >>>>>>>>>>>>> result using CachedTable - in other words automatically >> insert >> >>>>>>>>>>>> `CachedTable >> >>>>>>>>>>>>> cache()` calls. >> >>>>>>>>>>>>> 2. Automatically make decision to bypass explicit >> `CachedTable` >> >>>>>>> access >> >>>>>>>>>>>>> (this would be the equivalent of what you described as >> >> “semantic >> >>>>>>> 3”). >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> However as I wrote previously, I have big doubts if such >> >>>> cost-based >> >>>>>>>>>>>>> optimisation would work (this applies also to “Semantic >> 2”). I >> >>>>>>> would >> >>>>>>>>>>>> expect >> >>>>>>>>>>>>> it to do more harm than good in so many cases, that it >> wouldn’t >> >>>>>>> make >> >>>>>>>>>>>> sense. >> >>>>>>>>>>>>> Even assuming that we calculate statistics perfectly (this >> >> ain’t >> >>>>>>> gonna >> >>>>>>>>>>>>> happen), it’s virtually impossible to correctly estimate >> >> correct >> >>>>>>>>>>> exchange >> >>>>>>>>>>>>> rate of CPU cycles vs IO operations as it is changing so >> much >> >>>> from >> >>>>>>>>>>>>> deployment to deployment. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Is this the core of our disagreement here? That you would >> like >> >>>> this >> >>>>>>>>>>>>> “cache()” to be mostly hint for the optimiser? >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Piotrek >> >>>>>>>>>>>>> >> >>>>>>>>>>>>>> On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com >> > >> >>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Another potential concern for semantic 3 is that. In the >> >> future, >> >>>>>>> we >> >>>>>>>>>>> may >> >>>>>>>>>>>>> add >> >>>>>>>>>>>>>> automatic caching to Flink. e.g. cache the intermediate >> >> results >> >>>> at >> >>>>>>>>>>> the >> >>>>>>>>>>>>>> shuffle boundary. If our semantic is that reference to the >> >>>>>>> original >> >>>>>>>>>>>> table >> >>>>>>>>>>>>>> means skipping cache, those users may not be able to >> benefit >> >>>> from >> >>>>>>> the >> >>>>>>>>>>>>>> implicit cache. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> On Tue, Dec 11, 2018 at 12:10 PM Becket Qin < >> >>>> becket....@gmail.com >> >>>>>>>> >> >>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Hi Piotrek, >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Thanks for the reply. Thought about it again, I might have >> >>>>>>>>>>>> misunderstood >> >>>>>>>>>>>>>>> your proposal in earlier emails. Returning a CachedTable >> >> might >> >>>>>>> not >> >>>>>>>>>>> be >> >>>>>>>>>>>> a >> >>>>>>>>>>>>> bad >> >>>>>>>>>>>>>>> idea. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> I was more concerned about the semantic and its >> intuitiveness >> >>>>>>> when a >> >>>>>>>>>>>>>>> CachedTable is returned. i..e, if cache() returns >> >> CachedTable. >> >>>>>>> What >> >>>>>>>>>>>> are >> >>>>>>>>>>>>> the >> >>>>>>>>>>>>>>> semantic in the following code: >> >>>>>>>>>>>>>>> { >> >>>>>>>>>>>>>>> val cachedTable = a.cache() >> >>>>>>>>>>>>>>> val b = cachedTable.select(...) >> >>>>>>>>>>>>>>> val c = a.select(...) >> >>>>>>>>>>>>>>> } >> >>>>>>>>>>>>>>> What is the difference between b and c? At the first >> glance, >> >> I >> >>>>>>> see >> >>>>>>>>>>> two >> >>>>>>>>>>>>>>> options: >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses >> >>>>>>> original >> >>>>>>>>>>>> DAG >> >>>>>>>>>>>>> as >> >>>>>>>>>>>>>>> user demanded so. In this case, the optimizer has no >> chance >> >> to >> >>>>>>>>>>>> optimize. >> >>>>>>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c >> leaves >> >>>> the >> >>>>>>>>>>>>> optimizer >> >>>>>>>>>>>>>>> to choose whether the cache or DAG should be used. In this >> >>>> case, >> >>>>>>>>>>> user >> >>>>>>>>>>>>> lose >> >>>>>>>>>>>>>>> the option to NOT use cache. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> As you can see, neither of the options seem perfect. >> >> However, I >> >>>>>>>>>>> guess >> >>>>>>>>>>>>> you >> >>>>>>>>>>>>>>> and Till are proposing the third option: >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether >> cache or >> >>>> DAG >> >>>>>>>>>>>> should >> >>>>>>>>>>>>>>> be used. c always use the DAG. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> This does address all the concerns. It is just that from >> >>>>>>>>>>> intuitiveness >> >>>>>>>>>>>>>>> perspective, I found that asking user to explicitly use a >> >>>>>>>>>>> CachedTable >> >>>>>>>>>>>>> while >> >>>>>>>>>>>>>>> the optimizer might choose to ignore is a little weird. >> That >> >>>> was >> >>>>>>>>>>> why I >> >>>>>>>>>>>>> did >> >>>>>>>>>>>>>>> not think about that semantic. But given there is material >> >>>>>>> benefit, >> >>>>>>>>>>> I >> >>>>>>>>>>>>> think >> >>>>>>>>>>>>>>> this semantic is acceptable. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to >> use >> >>>>>>> cache >> >>>>>>>>>>> or >> >>>>>>>>>>>>> not, >> >>>>>>>>>>>>>>>> then why do we need “void cache()” method at all? Would >> It >> >>>>>>>>>>>> “increase” >> >>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>> chance of using the cache? That’s sounds strange. What >> would >> >>>> be >> >>>>>>> the >> >>>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? >> If we >> >>>>>>> want >> >>>>>>>>>>> to >> >>>>>>>>>>>>>>>> introduce such kind automated optimisations of “plan >> nodes >> >>>>>>>>>>>>> deduplication” >> >>>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the >> >>>>>>> optimiser >> >>>>>>>>>>> do >> >>>>>>>>>>>>> all of >> >>>>>>>>>>>>>>>> the work. >> >>>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any >> use/not >> >> use >> >>>>>>>>>>> cache >> >>>>>>>>>>>>>>>> decision. >> >>>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether >> >> such >> >>>>>>> cost >> >>>>>>>>>>>>> based >> >>>>>>>>>>>>>>>> optimisations would work properly and I would still >> insist >> >>>>>>> first on >> >>>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable >> cache()`) >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> We are absolutely on the same page here. An explicit >> cache() >> >>>>>>> method >> >>>>>>>>>>> is >> >>>>>>>>>>>>>>> necessary not only because optimizer may not be able to >> make >> >>>> the >> >>>>>>>>>>> right >> >>>>>>>>>>>>>>> decision, but also because of the nature of interactive >> >>>>>>> programming. >> >>>>>>>>>>>> For >> >>>>>>>>>>>>>>> example, if users write the following code in Scala shell: >> >>>>>>>>>>>>>>> val b = a.select(...) >> >>>>>>>>>>>>>>> val c = b.select(...) >> >>>>>>>>>>>>>>> val d = c.select(...).writeToSink(...) >> >>>>>>>>>>>>>>> tEnv.execute() >> >>>>>>>>>>>>>>> There is no way optimizer will know whether b or c will be >> >> used >> >>>>>>> in >> >>>>>>>>>>>> later >> >>>>>>>>>>>>>>> code, unless users hint explicitly. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to our >> >>>>>>>>>>> objections >> >>>>>>>>>>>> of >> >>>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which >> me, >> >>>>>>> Jark, >> >>>>>>>>>>>>> Fabian, >> >>>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Is there any other side effects if we use semantic 3 >> >> mentioned >> >>>>>>>>>>> above? >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Thanks, >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> JIangjie (Becket) Qin >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski < >> >>>>>>>>>>>> pi...@data-artisans.com >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Hi Becket, >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Sorry for not responding long time. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Regarding case1. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> There wouldn’t be no “a.unCache()” method, but I would >> >> expect >> >>>>>>> only >> >>>>>>>>>>>>>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` >> >> wouldn’t >> >>>>>>>>>>> affect >> >>>>>>>>>>>>>>>> `cachedTableA2`. Just as in any other database dropping >> >>>>>>> modifying >> >>>>>>>>>>> one >> >>>>>>>>>>>>>>>> independent table/materialised view does not affect >> others. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> What I meant is that assuming there is already a cached >> >>>> table, >> >>>>>>>>>>>> ideally >> >>>>>>>>>>>>>>>> users need >> >>>>>>>>>>>>>>>>> not to specify whether the next query should read from >> the >> >>>>>>> cache >> >>>>>>>>>>> or >> >>>>>>>>>>>>> use >> >>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to >> use >> >>>>>>> cache >> >>>>>>>>>>> or >> >>>>>>>>>>>>>>>> not, then why do we need “void cache()” method at all? >> Would >> >>>> It >> >>>>>>>>>>>>> “increase” >> >>>>>>>>>>>>>>>> the chance of using the cache? That’s sounds strange. >> What >> >>>>>>> would be >> >>>>>>>>>>>> the >> >>>>>>>>>>>>>>>> mechanism of deciding whether to use the cache or not? >> If we >> >>>>>>> want >> >>>>>>>>>>> to >> >>>>>>>>>>>>>>>> introduce such kind automated optimisations of “plan >> nodes >> >>>>>>>>>>>>> deduplication” >> >>>>>>>>>>>>>>>> I would turn it on globally, not per table, and let the >> >>>>>>> optimiser >> >>>>>>>>>>> do >> >>>>>>>>>>>>> all of >> >>>>>>>>>>>>>>>> the work. >> >>>>>>>>>>>>>>>> 2. We do not have statistics at the moment for any >> use/not >> >> use >> >>>>>>>>>>> cache >> >>>>>>>>>>>>>>>> decision. >> >>>>>>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether >> >> such >> >>>>>>> cost >> >>>>>>>>>>>>> based >> >>>>>>>>>>>>>>>> optimisations would work properly and I would still >> insist >> >>>>>>> first on >> >>>>>>>>>>>>>>>> providing explicit caching mechanism (`CachedTable >> cache()`) >> >>>>>>>>>>>>>>>> 4. As Till wrote, having explicit `CachedTable cache()` >> >>>> doesn’t >> >>>>>>>>>>>>>>>> contradict future work on automated cost based caching. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> At the same time I’m not sure if you have responded to >> our >> >>>>>>>>>>> objections >> >>>>>>>>>>>>> of >> >>>>>>>>>>>>>>>> `void cache()` being implicit/having side effects, which >> me, >> >>>>>>> Jark, >> >>>>>>>>>>>>> Fabian, >> >>>>>>>>>>>>>>>> Till and I think also Shaoxuan are supporting. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Piotrek >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> On 5 Dec 2018, at 12:42, Becket Qin < >> becket....@gmail.com> >> >>>>>>> wrote: >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Hi Till, >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> It is true that after the first job submission, there >> will >> >> be >> >>>>>>> no >> >>>>>>>>>>>>>>>> ambiguity >> >>>>>>>>>>>>>>>>> in terms of whether a cached table is used or not. That >> is >> >>>> the >> >>>>>>>>>>> same >> >>>>>>>>>>>>> for >> >>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> cache() without returning a CachedTable. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a >> >>>>>>> caching >> >>>>>>>>>>>>>>>> operator >> >>>>>>>>>>>>>>>>>> from which you need to consume from if you want to >> benefit >> >>>>>>> from >> >>>>>>>>>>> the >> >>>>>>>>>>>>>>>> caching >> >>>>>>>>>>>>>>>>>> functionality. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> I am thinking a little differently. I think it is a hint >> >> (as >> >>>>>>> you >> >>>>>>>>>>>>>>>> mentioned >> >>>>>>>>>>>>>>>>> later) instead of a new operator. I'd like to be careful >> >>>> about >> >>>>>>> the >> >>>>>>>>>>>>>>>> semantic >> >>>>>>>>>>>>>>>>> of the API. A hint is a property set on an existing >> >> operator, >> >>>>>>> but >> >>>>>>>>>>> is >> >>>>>>>>>>>>> not >> >>>>>>>>>>>>>>>>> itself an operator as it does not really manipulate the >> >> data. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of >> decision >> >>>>>>> which >> >>>>>>>>>>>>>>>>>> intermediate result should be cached. But especially >> when >> >>>>>>>>>>> executing >> >>>>>>>>>>>>>>>> ad-hoc >> >>>>>>>>>>>>>>>>>> queries the user might better know which results need >> to >> >> be >> >>>>>>>>>>> cached >> >>>>>>>>>>>>>>>> because >> >>>>>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I >> would >> >>>>>>> consider >> >>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, >> in >> >>>> the >> >>>>>>>>>>>> future >> >>>>>>>>>>>>> we >> >>>>>>>>>>>>>>>>>> might add functionality which tries to automatically >> cache >> >>>>>>>>>>> results >> >>>>>>>>>>>>>>>> (e.g. >> >>>>>>>>>>>>>>>>>> caching the latest intermediate results until so and so >> >> much >> >>>>>>>>>>> space >> >>>>>>>>>>>> is >> >>>>>>>>>>>>>>>>>> used). But this should hopefully not contradict with >> >>>>>>> `CachedTable >> >>>>>>>>>>>>>>>> cache()`. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> I agree that cache() method is needed for exactly the >> >> reason >> >>>>>>> you >> >>>>>>>>>>>>>>>> mentioned, >> >>>>>>>>>>>>>>>>> i.e. Flink cannot predict what users are going to write >> >>>> later, >> >>>>>>> so >> >>>>>>>>>>>>> users >> >>>>>>>>>>>>>>>>> need to tell Flink explicitly that this table will be >> used >> >>>>>>> later. >> >>>>>>>>>>>>> What I >> >>>>>>>>>>>>>>>>> meant is that assuming there is already a cached table, >> >>>> ideally >> >>>>>>>>>>>> users >> >>>>>>>>>>>>>>>> need >> >>>>>>>>>>>>>>>>> not to specify whether the next query should read from >> the >> >>>>>>> cache >> >>>>>>>>>>> or >> >>>>>>>>>>>>> use >> >>>>>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> original DAG. This should be decided by the optimizer. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> To explain the difference between returning / not >> >> returning a >> >>>>>>>>>>>>>>>> CachedTable, >> >>>>>>>>>>>>>>>>> I want compare the following two case: >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> *Case 1: returning a CachedTable* >> >>>>>>>>>>>>>>>>> b = a.map(...) >> >>>>>>>>>>>>>>>>> val cachedTableA1 = a.cache() >> >>>>>>>>>>>>>>>>> val cachedTableA2 = a.cache() >> >>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> c = a.filter(...) // User specify that the original DAG >> is >> >>>>>>> used? >> >>>>>>>>>>> Or >> >>>>>>>>>>>>> the >> >>>>>>>>>>>>>>>>> optimizer decides whether DAG or cache should be used? >> >>>>>>>>>>>>>>>>> d = cachedTableA1.filter() // User specify that the >> cached >> >>>>>>> table >> >>>>>>>>>>> is >> >>>>>>>>>>>>>>>> used. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> a.unCache() // Can cachedTableA still be used >> afterwards? >> >>>>>>>>>>>>>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be >> used? >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> *Case 2: not returning a CachedTable* >> >>>>>>>>>>>>>>>>> b = a.map() >> >>>>>>>>>>>>>>>>> a.cache() >> >>>>>>>>>>>>>>>>> a.cache() // no-op >> >>>>>>>>>>>>>>>>> b.print() // Just to make sure a is cached >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the >> cache or >> >>>> DAG >> >>>>>>>>>>>> should >> >>>>>>>>>>>>>>>> be >> >>>>>>>>>>>>>>>>> used >> >>>>>>>>>>>>>>>>> d = a.filter(...) // Optimizer decides whether the >> cache or >> >>>> DAG >> >>>>>>>>>>>> should >> >>>>>>>>>>>>>>>> be >> >>>>>>>>>>>>>>>>> used >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> a.unCache() >> >>>>>>>>>>>>>>>>> a.unCache() // no-op >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> In case 1, semantic wise, optimizer lose the option to >> >> choose >> >>>>>>>>>>>> between >> >>>>>>>>>>>>>>>> DAG >> >>>>>>>>>>>>>>>>> and cache. And the unCache() call becomes tricky. >> >>>>>>>>>>>>>>>>> In case 2, users do not need to worry about whether >> cache >> >> or >> >>>>>>> DAG >> >>>>>>>>>>> is >> >>>>>>>>>>>>>>>> used. >> >>>>>>>>>>>>>>>>> And the unCache() semantic is clear. However, the >> caveat is >> >>>>>>> that >> >>>>>>>>>>>> users >> >>>>>>>>>>>>>>>>> cannot explicitly ignore the cache. >> >>>>>>>>>>>>>>> > >