Hi Piotr, Thanks for the proposal and detailed explanation. I like the idea of returning a new hinted Table without modifying the original table. This also leave the room for users to benefit from future implicit caching.
Just to make sure I get the full picture. In your proposal, there will also be a 'void Table#uncache()' method to release the cache, right? Thanks, Jiangjie (Becket) Qin On Mon, Jan 7, 2019 at 11:50 PM Piotr Nowojski <pi...@da-platform.com> wrote: > Hi Becket! > > After further thinking I tend to agree that my previous proposal (*Option > 2*) indeed might not be if would in the future introduce automatic caching. > However I would like to propose a slightly modified version of it: > > *Option 4* > > Adding `cache()` method with following signature: > > Table Table#cache(); > > Without side-effects, and `cache()` call do not modify/change original > Table in any way. > It would return a copy of original table, with added hint for the > optimizer to cache the table, so that the future accesses to the returned > table might be cached or not. > > Assuming that we are talking about a setup, where we do not have automatic > caching enabled (possible future extension). > > Example #1: > > ``` > Table a = … > a.foo() // not cached > > val cachedTable = a.cache(); > > cachedA.bar() // maybe cached > a.foo() // same as before - effectively not cached > ``` > > Both the first and the second `a.foo()` operations would behave in the > exactly same way. Again, `a.cache()` call doesn’t affect `a` itself. If `a` > was not hinted for caching before `a.cache();`, then both `a.foo()` calls > wouldn’t use cache. > > Returned `cachedA` would be hinted with “cache” hint, so probably > `cachedA.bar()` would go through cache (unless optimiser decides the > opposite) > > Example #2 > > ``` > Table a = … > > a.foo() // not cached > > val b = a.cache(); > > a.foo() // same as before - effectively not cached > b.foo() // maybe cached > > val c = b.cache(); > > a.foo() // same as before - effectively not cached > b.foo() // same as before - effectively maybe cached > c.foo() // maybe cached > ``` > > Now, assuming that we have some future “automatic caching optimisation”: > > Example #3 > > ``` > env.enableAutomaticCaching() > Table a = … > > a.foo() // might be cached, depending if `a` was selected to automatic > caching > > val b = a.cache(); > > a.foo() // same as before - might be cached, if `a` was selected to > automatic caching > b.foo() // maybe cached > ``` > > > More or less this is the same behaviour as: > > Table a = ... > val b = a.filter(x > 20) > > calling `filter` hasn’t changed or altered `a` in anyway. If `a` was > previously filtered: > > Table src = … > val a = src.filter(x > 20) > val b = a.filter(x > 20) > > then yes, `a` and `b` will be the same. But the point is that neither > `filter` nor `cache` changes the original `a` table. > > One thing is that indeed, physically dropping cache operation, will have > side effects and it will in a way mutate the cached table references. But > this is I think unavoidable in any solution - the same issue as calling > `.close()`, or calling destructor in C++. > > Piotrek > > > On 7 Jan 2019, at 10:41, Becket Qin <becket....@gmail.com> wrote: > > > > Happy New Year, everybody! > > > > I would like to resume this discussion thread. At this point, We have > > agreed on the first step goal of interactive programming. The open > > discussion is the exact API. More specifically, what should *cache()* > > method return and what is the semantic. There are three options: > > > > *Option 1* > > *void cache()* OR *Table cache()* which returns the original table for > > chained calls. > > *void uncache() *releases the cache. > > *Table.hint(ignoreCache).foo()* to ignore cache for operation foo(). > > > > - Semantic: a.cache() hints that table 'a' should be cached. Optimizer > > decides whether the cache will be used or not. > > - pros: simple and no confusion between CachedTable and original table > > - cons: A table may be cached / uncached in a method invocation, while > the > > caller does not know about this. > > > > *Option 2* > > *CachedTable cache()* > > *CachedTable *extends *Table *with an additional *uncache()* method > > > > - Semantic: After *val cachedA = a.cache()*, *cachedA.foo()* will always > > use cache. *a.bar() *will always use original DAG. > > - pros: No potential side effects in method invocation. > > - cons: Optimizer has no chance to kick in. Future optimization will > become > > a behavior change and need users to change the code. > > > > *Option 3* > > *CacheHandle cache()* > > *CacheHandle.release() *to release a cache handle on the table. If all > > cache handles are released, the cache could be removed. > > *Table.hint(ignoreCache).foo()* to ignore cache for operation foo(). > > > > - Semantic: *a.cache() *hints that 'a' should be cached. Optimizer > decides > > whether the cache will be used or not. Cache is released either no handle > > is on it, or the user program exits. > > - pros: No potential side effect in method invocation. No confusion > between > > cached table v.s original table. > > - cons: An additional CacheHandle exposed to the users. > > > > > > Personally I prefer option 3 for the following reasons: > > 1. It is simple. Vast majority of the users would just call > > *a.cache()* followed > > by *a.foo(),* *a.bar(), etc. * > > 2. There is no semantic ambiguity and semantic change if we decide to add > > implicit cache in the future. > > 3. There is no side effect in the method calls. > > 4. Admittedly we need to expose one more CacheHandle class to the users. > > But it is not that difficult to understand given similar well known > concept > > like ref count (we can name it CacheReference if that is easier to > > understand). So I think it is fine. > > > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Dec 13, 2018 at 11:23 AM Becket Qin <becket....@gmail.com> > wrote: > > > >> Hi Piotrek, > >> > >> 1. Regarding optimization. > >> Sure there are many cases that the decision is hard to make. But that > does > >> not make it any easier for the users to make those decisions. I imagine > 99% > >> of the users would just naively use cache. I am not saying we can > optimize > >> in all the cases. But as long as we agree that at least in certain > cases (I > >> would argue most cases), optimizer can do a little better than an > average > >> user who likely knows little about Flink internals, we should not push > the > >> burden of optimization to users. > >> > >> BTW, it seems some of your concerns are related to the implementation. I > >> did not mention the implementation of the caching service because that > >> should not affect the API semantic. Not sure if this helps, but imagine > the > >> default implementation has one StorageNode service colocating with each > TM. > >> It could be running within the TM process or in a standalone process, > >> depending on configuration. > >> > >> The StorageNode uses memory + spill-to-disk mechanism. The cached data > >> will just be written to the local StorageNode service. If the > StorageNode > >> is running within the TM process, the in-memory cache could just be > objects > >> so we save some serde cost. A later job referring to the cached Table > will > >> be scheduled in a locality aware manner, i.e. run in the TM whose peer > >> StorageNode hosts the data. > >> > >> > >> 2. Semantic > >> I am not sure why introducing a new hintCache() or > >> env.enableAutomaticCaching() method would avoid the consequence of > semantic > >> change. > >> > >> If the auto optimization is not enabled by default, users still need to > >> make code change to all existing programs in order to get the benefit. > >> If the auto optimization is enabled by default, advanced users who know > >> that they really want to use cache will suddenly lose the opportunity > to do > >> so, unless they change the code to disable auto optimization. > >> > >> > >> 3. side effect > >> The CacheHandle is not only for where to put uncache(). It is to solve > the > >> implicit performance impact by moving the uncache() to the CacheHandle. > >> > >> - If users wants to leverage cache, they can call a.cache(). After > >> that, unless user explicitly release that CacheHandle, a.foo() will > always > >> leverage cache if needed (optimizer may choose to ignore cache if that > >> helps accelerate the process). Any function call will not be able to > >> release the cache because they do not have that CacheHandle. > >> - If some advanced users do not want to use cache at all, they will > >> call a.hint(ignoreCache).foo(). This will for sure ignore cache and > use the > >> original DAG to process. > >> > >> > >>> In vast majority of the cases, users wouldn't really care whether the > >>> cache is used or not. > >>> I wouldn’t agree with that, because “caching” (if not purely in memory > >>> caching) would add additional IO costs. It’s similar as saying that > users > >>> would not see a difference between Spark/Flink and MapReduce (MapReduce > >>> writes data to disks after every map/reduce stage). > >> > >> What I wanted to say is that in most cases, after users call cache(), > they > >> don't really care about whether auto optimization has decided to ignore > the > >> cache or not, as long as the program runs faster. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> > >> > >> > >> > >> > >> > >> > >> On Wed, Dec 12, 2018 at 10:50 PM Piotr Nowojski < > pi...@data-artisans.com> > >> wrote: > >> > >>> Hi, > >>> > >>> Thanks for the quick answer :) > >>> > >>> Re 1. > >>> > >>> I generally agree with you, however couple of points: > >>> > >>> a) the problem with using automatic caching is bigger, because you will > >>> have to decide, how do you compare IO vs CPU costs and if you pick > wrong, > >>> additional IO costs might be enormous or even can crash your system. > This > >>> is more difficult problem compared to let say join reordering, where > the > >>> only issue is to have good statistics that can capture correlations > between > >>> columns (when you reorder joins number of IO operations do not change) > >>> c) your example is completely independent of caching. > >>> > >>> Query like this: > >>> > >>> src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 ===`f2).as('f3, > >>> …).filter(‘f3 > 30) > >>> > >>> Should/could be optimised to empty result immediately, without the need > >>> for any cache/materialisation and that should work even without any > >>> statistics provided by the connector. > >>> > >>> For me prerequisite to any serious cost-based optimisations would be > some > >>> reasonable benchmark coverage of the code (tpch?). Otherwise that > would be > >>> equivalent of adding not tested code, since we wouldn’t be able to > verify > >>> our assumptions, like how does the writing of 10 000 records to > >>> cache/RocksDB/Kafka/CSV file compare to joining/filtering/processing of > >>> lets say 1000 000 rows. > >>> > >>> Re 2. > >>> > >>> I wasn’t proposing to change the semantic later. I was proposing that > we > >>> start now: > >>> > >>> CachedTable cachedA = a.cache() > >>> cachedA.foo() // Cache is used > >>> a.bar() // Original DAG is used > >>> > >>> And then later we can think about adding for example > >>> > >>> CachedTable cachedA = a.hintCache() > >>> cachedA.foo() // Cache might be used > >>> a.bar() // Original DAG is used > >>> > >>> Or > >>> > >>> env.enableAutomaticCaching() > >>> a.foo() // Cache might be used > >>> a.bar() // Cache might be used > >>> > >>> Or (I would still not like this option): > >>> > >>> a.hintCache() > >>> a.foo() // Cache might be used > >>> a.bar() // Cache might be used > >>> > >>> Or whatever else that will come to our mind. Even if we add some > >>> automatic caching in the future, keeping implicit (`CachedTable > cache()`) > >>> caching will still be useful, at least in some cases. > >>> > >>> Re 3. > >>> > >>>> 2. The source tables are immutable during one run of batch processing > >>> logic. > >>>> 3. The cache is immutable during one run of batch processing logic. > >>> > >>>> I think assumption 2 and 3 are by definition what batch processing > >>> means, > >>>> i.e the data must be complete before it is processed and should not > >>> change > >>>> when the processing is running. > >>> > >>> I agree that this is how batch systems SHOULD be working. However I > know > >>> from my previous experience that it’s not always the case. Sometimes > users > >>> are just working on some non transactional storage, which can be > (either > >>> constantly or occasionally) being modified by some other processes for > >>> whatever the reasons (fixing the data, updating, adding new data etc). > >>> > >>> But even if we ignore this point (data immutability), performance side > >>> effect issue of your proposal remains. If user calls `void a.cache()` > deep > >>> inside some private method, it will have implicit side effects on other > >>> parts of his program that might not be obvious. > >>> > >>> Re `CacheHandle`. > >>> > >>> If I understand it correctly, it only addresses the issue where to > place > >>> method `uncache`/`dropCache`. > >>> > >>> Btw, > >>> > >>>> In vast majority of the cases, users wouldn't really care whether the > >>> cache is used or not. > >>> > >>> I wouldn’t agree with that, because “caching” (if not purely in memory > >>> caching) would add additional IO costs. It’s similar as saying that > users > >>> would not see a difference between Spark/Flink and MapReduce (MapReduce > >>> writes data to disks after every map/reduce stage). > >>> > >>> Piotrek > >>> > >>>> On 12 Dec 2018, at 14:28, Becket Qin <becket....@gmail.com> wrote: > >>>> > >>>> Hi Piotrek, > >>>> > >>>> Not sure if you noticed, in my last email, I was proposing > `CacheHandle > >>>> cache()` to avoid the potential side effect due to function calls. > >>>> > >>>> Let's look at the disagreement in your reply one by one. > >>>> > >>>> > >>>> 1. Optimization chances > >>>> > >>>> Optimization is never a trivial work. This is exactly why we should > not > >>> let > >>>> user manually do that. Databases have done huge amount of work in this > >>>> area. At Alibaba, we rely heavily on many optimization rules to boost > >>> the > >>>> SQL query performance. > >>>> > >>>> In your example, if I filling the filter conditions in a certain way, > >>> the > >>>> optimization would become obvious. > >>>> > >>>> Table src1 = … // read from connector 1 > >>>> Table src2 = … // read from connector 2 > >>>> > >>>> Table a = src1.filte('f1 > 10).join(src2.filter('f2 < 30), `f1 === > >>>> `f2).as('f3, ...) > >>>> a.cache() // write cache to connector 3, when writing the records, > >>> remember > >>>> min and max of `f1 > >>>> > >>>> a.filter('f3 > 30) // There is no need to read from any connector > >>> because > >>>> `a` does not contain any record whose 'f3 is greater than 30. > >>>> env.execute() > >>>> a.select(…) > >>>> > >>>> BTW, it seems to me that adding some basic statistics is fairly > >>>> straightforward and the cost is pretty marginal if not ignorable. In > >>> fact > >>>> it is not only needed for optimization, but also for cases such as ML, > >>>> where some algorithms may need to decide their parameter based on the > >>>> statistics of the data. > >>>> > >>>> > >>>> 2. Same API, one semantic now, another semantic later. > >>>> > >>>> I am trying to understand what is the semantic of `CachedTable > cache()` > >>> you > >>>> are proposing. IMO, we should avoid designing an API whose semantic > >>> will be > >>>> changed later. If we have a "CachedTable cache()" method, then the > >>> semantic > >>>> should be very clearly defined upfront and do not change later. It > >>> should > >>>> never be "right now let's go with semantic 1, later we can silently > >>> change > >>>> it to semantic 2 or 3". Such change could result in bad consequence. > For > >>>> example, let's say we decide go with semantic 1: > >>>> > >>>> CachedTable cachedA = a.cache() > >>>> cachedA.foo() // Cache is used > >>>> a.bar() // Original DAG is used. > >>>> > >>>> Now majority of the users would be using cachedA.foo() in their code. > >>> And > >>>> some advanced users will use a.bar() to explicitly skip the cache. > Later > >>>> on, we added smart optimization and change the semantic to semantic 2: > >>>> > >>>> CachedTable cachedA = a.cache() > >>>> cachedA.foo() // Cache is used > >>>> a.bar() // Cache MIGHT be used, and Flink may decide to skip cache if > >>> it is > >>>> faster. > >>>> > >>>> Now most of the users who were writing cachedA.foo() will not benefit > >>> from > >>>> this optimization at all, unless they change their code to use a.foo() > >>>> instead. And those advanced users suddenly lose the option to > explicitly > >>>> ignore cache unless they change their code (assuming we care enough to > >>>> provide something like hint(useCache)). If we don't define the > semantic > >>>> carefully, our users will have to change their code again and again > >>> while > >>>> they shouldn't have to. > >>>> > >>>> > >>>> 3. side effect. > >>>> > >>>> Before we talk about side effect, we have to agree on the assumptions. > >>> The > >>>> assumptions I have are following: > >>>> 1. We are talking about batch processing. > >>>> 2. The source tables are immutable during one run of batch processing > >>> logic. > >>>> 3. The cache is immutable during one run of batch processing logic. > >>>> > >>>> I think assumption 2 and 3 are by definition what batch processing > >>> means, > >>>> i.e the data must be complete before it is processed and should not > >>> change > >>>> when the processing is running. > >>>> > >>>> As far as I am aware of, I don't know any batch processing system > >>> breaking > >>>> those assumptions. Even for relational database tables, where queries > >>> can > >>>> run with concurrent modifications, necessary locking are still > required > >>> to > >>>> ensure the integrity of the query result. > >>>> > >>>> Please let me know if you disagree with the above assumptions. If you > >>> agree > >>>> with these assumptions, with the `CacheHandle cache()` API in my last > >>>> email, do you still see side effects? > >>>> > >>>> Thanks, > >>>> > >>>> Jiangjie (Becket) Qin > >>>> > >>>> > >>>> On Wed, Dec 12, 2018 at 7:11 PM Piotr Nowojski < > pi...@data-artisans.com > >>>> > >>>> wrote: > >>>> > >>>>> Hi Becket, > >>>>> > >>>>>> Regarding the chance of optimization, it might not be that rare. > Some > >>>>> very > >>>>>> simple statistics could already help in many cases. For example, > >>> simply > >>>>>> maintaining max and min of each fields can already eliminate some > >>>>>> unnecessary table scan (potentially scanning the cached table) if > the > >>>>>> result is doomed to be empty. A histogram would give even further > >>>>>> information. The optimizer could be very careful and only ignores > >>> cache > >>>>>> when it is 100% sure doing that is cheaper. e.g. only when a filter > on > >>>>> the > >>>>>> cache will absolutely return nothing. > >>>>> > >>>>> I do not see how this might be easy to achieve. It would require tons > >>> of > >>>>> effort to make it work and in the end you would still have a problem > of > >>>>> comparing/trading CPU cycles vs IO. For example: > >>>>> > >>>>> Table src1 = … // read from connector 1 > >>>>> Table src2 = … // read from connector 2 > >>>>> > >>>>> Table a = src1.filter(…).join(src2.filter(…), …) > >>>>> a.cache() // write cache to connector 3 > >>>>> > >>>>> a.filter(…) > >>>>> env.execute() > >>>>> a.select(…) > >>>>> > >>>>> Decision whether it’s better to: > >>>>> A) read from connector1/connector2, filter/map and join them twice > >>>>> B) read from connector1/connector2, filter/map and join them once, > pay > >>> the > >>>>> price of writing to connector 3 and then reading from it > >>>>> > >>>>> Is very far from trivial. `a` can end up much larger than `src1` and > >>>>> `src2`, writes to connector 3 might be extremely slow, reads from > >>> connector > >>>>> 3 can be slower compared to reads from connector 1 & 2, … . You > really > >>> need > >>>>> to have extremely good statistics to correctly asses size of the > >>> output and > >>>>> it would still be failing many times (correlations etc). And keep in > >>> mind > >>>>> that at the moment we do not have ANY statistics at all. More than > >>> that, it > >>>>> would require significantly more testing and setting up some > >>> benchmarks to > >>>>> make sure that we do not brake it with some regressions. > >>>>> > >>>>> That’s why I’m strongly opposing this idea - at least let’s not > starts > >>>>> with this. If we first start with completely manual/explicit caching, > >>>>> without any magic, it would be a significant improvement for the > users > >>> for > >>>>> a fraction of the development cost. After implementing that, when we > >>>>> already have all of the working pieces, we can start working on some > >>>>> optimisations rules. As I wrote before, if we start with > >>>>> > >>>>> `CachedTable cache()` > >>>>> > >>>>> We can later work on follow up stories to make it automatic. Despite > >>> that > >>>>> I don’t like this implicit/side effect approach with `void` method, > >>> having > >>>>> explicit `CachedTable cache()` wouldn’t even prevent as from later > >>> adding > >>>>> `void hintCache()` method, with the exact semantic that you want. > >>>>> > >>>>> On top of that I re-rise again that having implicit `void > >>>>> cache()/hintCache()` has other side effects and problems with non > >>> immutable > >>>>> data, and being annoying when used secretly inside methods. > >>>>> > >>>>> Explicit `CachedTable cache()` just looks like much less > controversial > >>> MVP > >>>>> and if we decide to go further with this topic, it’s not a wasted > >>> effort, > >>>>> but just lies on a stright path to more advanced/complicated > solutions > >>> in > >>>>> the future. Are there any drawbacks of starting with `CachedTable > >>> cache()` > >>>>> that I’m missing? > >>>>> > >>>>> Piotrek > >>>>> > >>>>>> On 12 Dec 2018, at 09:30, Jeff Zhang <zjf...@gmail.com> wrote: > >>>>>> > >>>>>> Hi Becket, > >>>>>> > >>>>>> Introducing CacheHandle seems too complicated. That means users have > >>> to > >>>>>> maintain Handler properly. > >>>>>> > >>>>>> And since cache is just a hint for optimizer, why not just return > >>> Table > >>>>>> itself for cache method. This hint info should be kept in Table I > >>>>> believe. > >>>>>> > >>>>>> So how about adding method cache and uncache for Table, and both > >>> return > >>>>>> Table. Because what cache and uncache did is just adding some hint > >>> info > >>>>>> into Table. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> Becket Qin <becket....@gmail.com> 于2018年12月12日周三 上午11:25写道: > >>>>>> > >>>>>>> Hi Till and Piotrek, > >>>>>>> > >>>>>>> Thanks for the clarification. That solves quite a few confusion. My > >>>>>>> understanding of how cache works is same as what Till describe. > i.e. > >>>>>>> cache() is a hint to Flink, but it is not guaranteed that cache > >>> always > >>>>>>> exist and it might be recomputed from its lineage. > >>>>>>> > >>>>>>> Is this the core of our disagreement here? That you would like this > >>>>>>>> “cache()” to be mostly hint for the optimiser? > >>>>>>> > >>>>>>> Semantic wise, yes. That's also why I think materialize() has a > much > >>>>> larger > >>>>>>> scope than cache(), thus it should be a different method. > >>>>>>> > >>>>>>> Regarding the chance of optimization, it might not be that rare. > Some > >>>>> very > >>>>>>> simple statistics could already help in many cases. For example, > >>> simply > >>>>>>> maintaining max and min of each fields can already eliminate some > >>>>>>> unnecessary table scan (potentially scanning the cached table) if > the > >>>>>>> result is doomed to be empty. A histogram would give even further > >>>>>>> information. The optimizer could be very careful and only ignores > >>> cache > >>>>>>> when it is 100% sure doing that is cheaper. e.g. only when a filter > >>> on > >>>>> the > >>>>>>> cache will absolutely return nothing. > >>>>>>> > >>>>>>> Given the above clarification on cache, I would like to revisit the > >>>>>>> original "void cache()" proposal and see if we can improve on top > of > >>>>> that. > >>>>>>> > >>>>>>> What do you think about the following modified interface? > >>>>>>> > >>>>>>> Table { > >>>>>>> /** > >>>>>>> * This call hints Flink to maintain a cache of this table and > >>> leverage > >>>>>>> it for performance optimization if needed. > >>>>>>> * Note that Flink may still decide to not use the cache if it is > >>>>> cheaper > >>>>>>> by doing so. > >>>>>>> * > >>>>>>> * A CacheHandle will be returned to allow user release the cache > >>>>>>> actively. The cache will be deleted if there > >>>>>>> * is no unreleased cache handlers to it. When the TableEnvironment > >>> is > >>>>>>> closed. The cache will also be deleted > >>>>>>> * and all the cache handlers will be released. > >>>>>>> * > >>>>>>> * @return a CacheHandle referring to the cache of this table. > >>>>>>> */ > >>>>>>> CacheHandle cache(); > >>>>>>> } > >>>>>>> > >>>>>>> CacheHandle { > >>>>>>> /** > >>>>>>> * Close the cache handle. This method does not necessarily deletes > >>> the > >>>>>>> cache. Instead, it simply decrements the reference counter to the > >>> cache. > >>>>>>> * When the there is no handle referring to a cache. The cache will > >>> be > >>>>>>> deleted. > >>>>>>> * > >>>>>>> * @return the number of open handles to the cache after this handle > >>>>> has > >>>>>>> been released. > >>>>>>> */ > >>>>>>> int release() > >>>>>>> } > >>>>>>> > >>>>>>> The rationale behind this interface is following: > >>>>>>> In vast majority of the cases, users wouldn't really care whether > the > >>>>> cache > >>>>>>> is used or not. So I think the most intuitive way is letting > cache() > >>>>> return > >>>>>>> nothing. So nobody needs to worry about the difference between > >>>>> operations > >>>>>>> on CacheTables and those on the "original" tables. This will make > >>> maybe > >>>>>>> 99.9% of the users happy. There were two concerns raised for this > >>>>> approach: > >>>>>>> 1. In some rare cases, users may want to ignore cache, > >>>>>>> 2. A table might be cached/uncached in a third party function while > >>> the > >>>>>>> caller does not know. > >>>>>>> > >>>>>>> For the first issue, users can use hint("ignoreCache") to > explicitly > >>>>> ignore > >>>>>>> cache. > >>>>>>> For the second issue, the above proposal lets cache() return a > >>>>> CacheHandle, > >>>>>>> the only method in it is release(). Different CacheHandles will > >>> refer to > >>>>>>> the same cache, if a cache no longer has any cache handle, it will > be > >>>>>>> deleted. This will address the following case: > >>>>>>> { > >>>>>>> val handle1 = a.cache() > >>>>>>> process(a) > >>>>>>> a.select(...) // cache is still available, handle1 has not been > >>>>> released. > >>>>>>> } > >>>>>>> > >>>>>>> void process(Table t) { > >>>>>>> val handle2 = t.cache() // new handle to cache > >>>>>>> t.select(...) // optimizer decides cache usage > >>>>>>> t.hint("ignoreCache").select(...) // cache is ignored > >>>>>>> handle2.release() // release the handle, but the cache may still be > >>>>>>> available if there are other handles > >>>>>>> ... > >>>>>>> } > >>>>>>> > >>>>>>> Does the above modified approach look reasonable to you? > >>>>>>> > >>>>>>> Cheers, > >>>>>>> > >>>>>>> Jiangjie (Becket) Qin > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Dec 11, 2018 at 6:44 PM Till Rohrmann < > trohrm...@apache.org> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Becket, > >>>>>>>> > >>>>>>>> I was aiming at semantics similar to 1. I actually thought that > >>>>> `cache()` > >>>>>>>> would tell the system to materialize the intermediate result so > that > >>>>>>>> subsequent queries don't need to reprocess it. This means that the > >>>>> usage > >>>>>>> of > >>>>>>>> the cached table in this example > >>>>>>>> > >>>>>>>> { > >>>>>>>> val cachedTable = a.cache() > >>>>>>>> val b1 = cachedTable.select(…) > >>>>>>>> val b2 = cachedTable.foo().select(…) > >>>>>>>> val b3 = cachedTable.bar().select(...) > >>>>>>>> val c1 = a.select(…) > >>>>>>>> val c2 = a.foo().select(…) > >>>>>>>> val c3 = a.bar().select(...) > >>>>>>>> } > >>>>>>>> > >>>>>>>> strongly depends on interleaved calls which trigger the execution > of > >>>>> sub > >>>>>>>> queries. So for example, if there is only a single env.execute > call > >>> at > >>>>>>> the > >>>>>>>> end of block, then b1, b2, b3, c1, c2 and c3 would all be > computed > >>> by > >>>>>>>> reading directly from the sources (given that there is only a > single > >>>>>>>> JobGraph). It just happens that the result of `a` will be cached > >>> such > >>>>>>> that > >>>>>>>> we skip the processing of `a` when there are subsequent queries > >>> reading > >>>>>>>> from `cachedTable`. If for some reason the system cannot > materialize > >>>>> the > >>>>>>>> table (e.g. running out of disk space, ttl expired), then it could > >>> also > >>>>>>>> happen that we need to reprocess `a`. In that sense `cachedTable` > >>>>> simply > >>>>>>> is > >>>>>>>> an identifier for the materialized result of `a` with the lineage > >>> how > >>>>> to > >>>>>>>> reprocess it. > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Till > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Dec 11, 2018 at 11:01 AM Piotr Nowojski < > >>>>> pi...@data-artisans.com > >>>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Becket, > >>>>>>>>> > >>>>>>>>>> { > >>>>>>>>>> val cachedTable = a.cache() > >>>>>>>>>> val b = cachedTable.select(...) > >>>>>>>>>> val c = a.select(...) > >>>>>>>>>> } > >>>>>>>>>> > >>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses > >>> original > >>>>>>> DAG > >>>>>>>>> as > >>>>>>>>>> user demanded so. In this case, the optimizer has no chance to > >>>>>>>> optimize. > >>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves the > >>>>>>>>> optimizer > >>>>>>>>>> to choose whether the cache or DAG should be used. In this case, > >>> user > >>>>>>>>> lose > >>>>>>>>>> the option to NOT use cache. > >>>>>>>>>> > >>>>>>>>>> As you can see, neither of the options seem perfect. However, I > >>> guess > >>>>>>>> you > >>>>>>>>>> and Till are proposing the third option: > >>>>>>>>>> > >>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or > DAG > >>>>>>>> should > >>>>>>>>> be > >>>>>>>>>> used. c always use the DAG. > >>>>>>>>> > >>>>>>>>> I am pretty sure that me, Till, Fabian and others were all > >>> proposing > >>>>>>> and > >>>>>>>>> advocating in favour of semantic “1”. No cost based optimiser > >>>>> decisions > >>>>>>>> at > >>>>>>>>> all. > >>>>>>>>> > >>>>>>>>> { > >>>>>>>>> val cachedTable = a.cache() > >>>>>>>>> val b1 = cachedTable.select(…) > >>>>>>>>> val b2 = cachedTable.foo().select(…) > >>>>>>>>> val b3 = cachedTable.bar().select(...) > >>>>>>>>> val c1 = a.select(…) > >>>>>>>>> val c2 = a.foo().select(…) > >>>>>>>>> val c3 = a.bar().select(...) > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> All b1, b2 and b3 are reading from cache, while c1, c2 and c3 are > >>>>>>>>> re-executing whole plan for “a”. > >>>>>>>>> > >>>>>>>>> In the future we could discuss going one step further, > introducing > >>>>> some > >>>>>>>>> global optimisation (that can be manually enabled/disabled): > >>>>>>> deduplicate > >>>>>>>>> plan nodes/deduplicate sub queries/re-use sub queries results/or > >>>>>>> whatever > >>>>>>>>> we could call it. It could do two things: > >>>>>>>>> > >>>>>>>>> 1. Automatically try to deduplicate fragments of the plan and > share > >>>>> the > >>>>>>>>> result using CachedTable - in other words automatically insert > >>>>>>>> `CachedTable > >>>>>>>>> cache()` calls. > >>>>>>>>> 2. Automatically make decision to bypass explicit `CachedTable` > >>> access > >>>>>>>>> (this would be the equivalent of what you described as “semantic > >>> 3”). > >>>>>>>>> > >>>>>>>>> However as I wrote previously, I have big doubts if such > cost-based > >>>>>>>>> optimisation would work (this applies also to “Semantic 2”). I > >>> would > >>>>>>>> expect > >>>>>>>>> it to do more harm than good in so many cases, that it wouldn’t > >>> make > >>>>>>>> sense. > >>>>>>>>> Even assuming that we calculate statistics perfectly (this ain’t > >>> gonna > >>>>>>>>> happen), it’s virtually impossible to correctly estimate correct > >>>>>>> exchange > >>>>>>>>> rate of CPU cycles vs IO operations as it is changing so much > from > >>>>>>>>> deployment to deployment. > >>>>>>>>> > >>>>>>>>> Is this the core of our disagreement here? That you would like > this > >>>>>>>>> “cache()” to be mostly hint for the optimiser? > >>>>>>>>> > >>>>>>>>> Piotrek > >>>>>>>>> > >>>>>>>>>> On 11 Dec 2018, at 06:00, Becket Qin <becket....@gmail.com> > >>> wrote: > >>>>>>>>>> > >>>>>>>>>> Another potential concern for semantic 3 is that. In the future, > >>> we > >>>>>>> may > >>>>>>>>> add > >>>>>>>>>> automatic caching to Flink. e.g. cache the intermediate results > at > >>>>>>> the > >>>>>>>>>> shuffle boundary. If our semantic is that reference to the > >>> original > >>>>>>>> table > >>>>>>>>>> means skipping cache, those users may not be able to benefit > from > >>> the > >>>>>>>>>> implicit cache. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Tue, Dec 11, 2018 at 12:10 PM Becket Qin < > becket....@gmail.com > >>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the reply. Thought about it again, I might have > >>>>>>>> misunderstood > >>>>>>>>>>> your proposal in earlier emails. Returning a CachedTable might > >>> not > >>>>>>> be > >>>>>>>> a > >>>>>>>>> bad > >>>>>>>>>>> idea. > >>>>>>>>>>> > >>>>>>>>>>> I was more concerned about the semantic and its intuitiveness > >>> when a > >>>>>>>>>>> CachedTable is returned. i..e, if cache() returns CachedTable. > >>> What > >>>>>>>> are > >>>>>>>>> the > >>>>>>>>>>> semantic in the following code: > >>>>>>>>>>> { > >>>>>>>>>>> val cachedTable = a.cache() > >>>>>>>>>>> val b = cachedTable.select(...) > >>>>>>>>>>> val c = a.select(...) > >>>>>>>>>>> } > >>>>>>>>>>> What is the difference between b and c? At the first glance, I > >>> see > >>>>>>> two > >>>>>>>>>>> options: > >>>>>>>>>>> > >>>>>>>>>>> Semantic 1. b uses cachedTable as user demanded so. c uses > >>> original > >>>>>>>> DAG > >>>>>>>>> as > >>>>>>>>>>> user demanded so. In this case, the optimizer has no chance to > >>>>>>>> optimize. > >>>>>>>>>>> Semantic 2. b uses cachedTable as user demanded so. c leaves > the > >>>>>>>>> optimizer > >>>>>>>>>>> to choose whether the cache or DAG should be used. In this > case, > >>>>>>> user > >>>>>>>>> lose > >>>>>>>>>>> the option to NOT use cache. > >>>>>>>>>>> > >>>>>>>>>>> As you can see, neither of the options seem perfect. However, I > >>>>>>> guess > >>>>>>>>> you > >>>>>>>>>>> and Till are proposing the third option: > >>>>>>>>>>> > >>>>>>>>>>> Semantic 3. b leaves the optimizer to choose whether cache or > DAG > >>>>>>>> should > >>>>>>>>>>> be used. c always use the DAG. > >>>>>>>>>>> > >>>>>>>>>>> This does address all the concerns. It is just that from > >>>>>>> intuitiveness > >>>>>>>>>>> perspective, I found that asking user to explicitly use a > >>>>>>> CachedTable > >>>>>>>>> while > >>>>>>>>>>> the optimizer might choose to ignore is a little weird. That > was > >>>>>>> why I > >>>>>>>>> did > >>>>>>>>>>> not think about that semantic. But given there is material > >>> benefit, > >>>>>>> I > >>>>>>>>> think > >>>>>>>>>>> this semantic is acceptable. > >>>>>>>>>>> > >>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use > >>> cache > >>>>>>> or > >>>>>>>>> not, > >>>>>>>>>>>> then why do we need “void cache()” method at all? Would It > >>>>>>>> “increase” > >>>>>>>>> the > >>>>>>>>>>>> chance of using the cache? That’s sounds strange. What would > be > >>> the > >>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we > >>> want > >>>>>>> to > >>>>>>>>>>>> introduce such kind automated optimisations of “plan nodes > >>>>>>>>> deduplication” > >>>>>>>>>>>> I would turn it on globally, not per table, and let the > >>> optimiser > >>>>>>> do > >>>>>>>>> all of > >>>>>>>>>>>> the work. > >>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not use > >>>>>>> cache > >>>>>>>>>>>> decision. > >>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether such > >>> cost > >>>>>>>>> based > >>>>>>>>>>>> optimisations would work properly and I would still insist > >>> first on > >>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`) > >>>>>>>>>>>> > >>>>>>>>>>> We are absolutely on the same page here. An explicit cache() > >>> method > >>>>>>> is > >>>>>>>>>>> necessary not only because optimizer may not be able to make > the > >>>>>>> right > >>>>>>>>>>> decision, but also because of the nature of interactive > >>> programming. > >>>>>>>> For > >>>>>>>>>>> example, if users write the following code in Scala shell: > >>>>>>>>>>> val b = a.select(...) > >>>>>>>>>>> val c = b.select(...) > >>>>>>>>>>> val d = c.select(...).writeToSink(...) > >>>>>>>>>>> tEnv.execute() > >>>>>>>>>>> There is no way optimizer will know whether b or c will be used > >>> in > >>>>>>>> later > >>>>>>>>>>> code, unless users hint explicitly. > >>>>>>>>>>> > >>>>>>>>>>> At the same time I’m not sure if you have responded to our > >>>>>>> objections > >>>>>>>> of > >>>>>>>>>>>> `void cache()` being implicit/having side effects, which me, > >>> Jark, > >>>>>>>>> Fabian, > >>>>>>>>>>>> Till and I think also Shaoxuan are supporting. > >>>>>>>>>>> > >>>>>>>>>>> Is there any other side effects if we use semantic 3 mentioned > >>>>>>> above? > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> JIangjie (Becket) Qin > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Dec 10, 2018 at 7:54 PM Piotr Nowojski < > >>>>>>>> pi...@data-artisans.com > >>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Becket, > >>>>>>>>>>>> > >>>>>>>>>>>> Sorry for not responding long time. > >>>>>>>>>>>> > >>>>>>>>>>>> Regarding case1. > >>>>>>>>>>>> > >>>>>>>>>>>> There wouldn’t be no “a.unCache()” method, but I would expect > >>> only > >>>>>>>>>>>> `cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t > >>>>>>> affect > >>>>>>>>>>>> `cachedTableA2`. Just as in any other database dropping > >>> modifying > >>>>>>> one > >>>>>>>>>>>> independent table/materialised view does not affect others. > >>>>>>>>>>>> > >>>>>>>>>>>>> What I meant is that assuming there is already a cached > table, > >>>>>>>> ideally > >>>>>>>>>>>> users need > >>>>>>>>>>>>> not to specify whether the next query should read from the > >>> cache > >>>>>>> or > >>>>>>>>> use > >>>>>>>>>>>> the > >>>>>>>>>>>>> original DAG. This should be decided by the optimizer. > >>>>>>>>>>>> > >>>>>>>>>>>> 1. If we want to let optimiser make decisions whether to use > >>> cache > >>>>>>> or > >>>>>>>>>>>> not, then why do we need “void cache()” method at all? Would > It > >>>>>>>>> “increase” > >>>>>>>>>>>> the chance of using the cache? That’s sounds strange. What > >>> would be > >>>>>>>> the > >>>>>>>>>>>> mechanism of deciding whether to use the cache or not? If we > >>> want > >>>>>>> to > >>>>>>>>>>>> introduce such kind automated optimisations of “plan nodes > >>>>>>>>> deduplication” > >>>>>>>>>>>> I would turn it on globally, not per table, and let the > >>> optimiser > >>>>>>> do > >>>>>>>>> all of > >>>>>>>>>>>> the work. > >>>>>>>>>>>> 2. We do not have statistics at the moment for any use/not use > >>>>>>> cache > >>>>>>>>>>>> decision. > >>>>>>>>>>>> 3. Even if we had, I would be veeerryy sceptical whether such > >>> cost > >>>>>>>>> based > >>>>>>>>>>>> optimisations would work properly and I would still insist > >>> first on > >>>>>>>>>>>> providing explicit caching mechanism (`CachedTable cache()`) > >>>>>>>>>>>> 4. As Till wrote, having explicit `CachedTable cache()` > doesn’t > >>>>>>>>>>>> contradict future work on automated cost based caching. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> At the same time I’m not sure if you have responded to our > >>>>>>> objections > >>>>>>>>> of > >>>>>>>>>>>> `void cache()` being implicit/having side effects, which me, > >>> Jark, > >>>>>>>>> Fabian, > >>>>>>>>>>>> Till and I think also Shaoxuan are supporting. > >>>>>>>>>>>> > >>>>>>>>>>>> Piotrek > >>>>>>>>>>>> > >>>>>>>>>>>>> On 5 Dec 2018, at 12:42, Becket Qin <becket....@gmail.com> > >>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Till, > >>>>>>>>>>>>> > >>>>>>>>>>>>> It is true that after the first job submission, there will be > >>> no > >>>>>>>>>>>> ambiguity > >>>>>>>>>>>>> in terms of whether a cached table is used or not. That is > the > >>>>>>> same > >>>>>>>>> for > >>>>>>>>>>>> the > >>>>>>>>>>>>> cache() without returning a CachedTable. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Conceptually one could think of cache() as introducing a > >>> caching > >>>>>>>>>>>> operator > >>>>>>>>>>>>>> from which you need to consume from if you want to benefit > >>> from > >>>>>>> the > >>>>>>>>>>>> caching > >>>>>>>>>>>>>> functionality. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I am thinking a little differently. I think it is a hint (as > >>> you > >>>>>>>>>>>> mentioned > >>>>>>>>>>>>> later) instead of a new operator. I'd like to be careful > about > >>> the > >>>>>>>>>>>> semantic > >>>>>>>>>>>>> of the API. A hint is a property set on an existing operator, > >>> but > >>>>>>> is > >>>>>>>>> not > >>>>>>>>>>>>> itself an operator as it does not really manipulate the data. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision > >>> which > >>>>>>>>>>>>>> intermediate result should be cached. But especially when > >>>>>>> executing > >>>>>>>>>>>> ad-hoc > >>>>>>>>>>>>>> queries the user might better know which results need to be > >>>>>>> cached > >>>>>>>>>>>> because > >>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would > >>> consider > >>>>>>>> the > >>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in > the > >>>>>>>> future > >>>>>>>>> we > >>>>>>>>>>>>>> might add functionality which tries to automatically cache > >>>>>>> results > >>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>> caching the latest intermediate results until so and so much > >>>>>>> space > >>>>>>>> is > >>>>>>>>>>>>>> used). But this should hopefully not contradict with > >>> `CachedTable > >>>>>>>>>>>> cache()`. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I agree that cache() method is needed for exactly the reason > >>> you > >>>>>>>>>>>> mentioned, > >>>>>>>>>>>>> i.e. Flink cannot predict what users are going to write > later, > >>> so > >>>>>>>>> users > >>>>>>>>>>>>> need to tell Flink explicitly that this table will be used > >>> later. > >>>>>>>>> What I > >>>>>>>>>>>>> meant is that assuming there is already a cached table, > ideally > >>>>>>>> users > >>>>>>>>>>>> need > >>>>>>>>>>>>> not to specify whether the next query should read from the > >>> cache > >>>>>>> or > >>>>>>>>> use > >>>>>>>>>>>> the > >>>>>>>>>>>>> original DAG. This should be decided by the optimizer. > >>>>>>>>>>>>> > >>>>>>>>>>>>> To explain the difference between returning / not returning a > >>>>>>>>>>>> CachedTable, > >>>>>>>>>>>>> I want compare the following two case: > >>>>>>>>>>>>> > >>>>>>>>>>>>> *Case 1: returning a CachedTable* > >>>>>>>>>>>>> b = a.map(...) > >>>>>>>>>>>>> val cachedTableA1 = a.cache() > >>>>>>>>>>>>> val cachedTableA2 = a.cache() > >>>>>>>>>>>>> b.print() // Just to make sure a is cached. > >>>>>>>>>>>>> > >>>>>>>>>>>>> c = a.filter(...) // User specify that the original DAG is > >>> used? > >>>>>>> Or > >>>>>>>>> the > >>>>>>>>>>>>> optimizer decides whether DAG or cache should be used? > >>>>>>>>>>>>> d = cachedTableA1.filter() // User specify that the cached > >>> table > >>>>>>> is > >>>>>>>>>>>> used. > >>>>>>>>>>>>> > >>>>>>>>>>>>> a.unCache() // Can cachedTableA still be used afterwards? > >>>>>>>>>>>>> cachedTableA1.uncache() // Can cachedTableA2 still be used? > >>>>>>>>>>>>> > >>>>>>>>>>>>> *Case 2: not returning a CachedTable* > >>>>>>>>>>>>> b = a.map() > >>>>>>>>>>>>> a.cache() > >>>>>>>>>>>>> a.cache() // no-op > >>>>>>>>>>>>> b.print() // Just to make sure a is cached > >>>>>>>>>>>>> > >>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or > DAG > >>>>>>>> should > >>>>>>>>>>>> be > >>>>>>>>>>>>> used > >>>>>>>>>>>>> d = a.filter(...) // Optimizer decides whether the cache or > DAG > >>>>>>>> should > >>>>>>>>>>>> be > >>>>>>>>>>>>> used > >>>>>>>>>>>>> > >>>>>>>>>>>>> a.unCache() > >>>>>>>>>>>>> a.unCache() // no-op > >>>>>>>>>>>>> > >>>>>>>>>>>>> In case 1, semantic wise, optimizer lose the option to choose > >>>>>>>> between > >>>>>>>>>>>> DAG > >>>>>>>>>>>>> and cache. And the unCache() call becomes tricky. > >>>>>>>>>>>>> In case 2, users do not need to worry about whether cache or > >>> DAG > >>>>>>> is > >>>>>>>>>>>> used. > >>>>>>>>>>>>> And the unCache() semantic is clear. However, the caveat is > >>> that > >>>>>>>> users > >>>>>>>>>>>>> cannot explicitly ignore the cache. > >>>>>>>>>>>>> > >>>>>>>>>>>>> In order to address the issues mentioned in case 2 and > >>> inspired by > >>>>>>>> the > >>>>>>>>>>>>> discussion so far, I am thinking about using hint to allow > user > >>>>>>>>>>>> explicitly > >>>>>>>>>>>>> ignore cache. Although we do not have hint yet, but we > probably > >>>>>>>> should > >>>>>>>>>>>> have > >>>>>>>>>>>>> one. So the code becomes: > >>>>>>>>>>>>> > >>>>>>>>>>>>> *Case 3: returning this table* > >>>>>>>>>>>>> b = a.map() > >>>>>>>>>>>>> a.cache() > >>>>>>>>>>>>> a.cache() // no-op > >>>>>>>>>>>>> b.print() // Just to make sure a is cached > >>>>>>>>>>>>> > >>>>>>>>>>>>> c = a.filter(...) // Optimizer decides whether the cache or > DAG > >>>>>>>> should > >>>>>>>>>>>> be > >>>>>>>>>>>>> used > >>>>>>>>>>>>> d = a.hint("ignoreCache").filter(...) // DAG will be used > >>> instead > >>>>>>> of > >>>>>>>>> the > >>>>>>>>>>>>> cache. > >>>>>>>>>>>>> > >>>>>>>>>>>>> a.unCache() > >>>>>>>>>>>>> a.unCache() // no-op > >>>>>>>>>>>>> > >>>>>>>>>>>>> We could also let cache() return this table to allow chained > >>>>>>> method > >>>>>>>>>>>> calls. > >>>>>>>>>>>>> Do you think this API addresses the concerns? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, Dec 5, 2018 at 10:55 AM Jark Wu <imj...@gmail.com> > >>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> All the recent discussions are focused on whether there is a > >>>>>>>> problem > >>>>>>>>> if > >>>>>>>>>>>>>> cache() not return a Table. > >>>>>>>>>>>>>> It seems that returning a Table explicitly is more clear > (and > >>>>>>>> safe?). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> So whether there are any problems if cache() returns a > Table? > >>>>>>>>> @Becket > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, 4 Dec 2018 at 22:27, Till Rohrmann < > >>> trohrm...@apache.org > >>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> It's true that b, c, d and e will all read from the > original > >>> DAG > >>>>>>>>> that > >>>>>>>>>>>>>>> generates a. But all subsequent operators (when running > >>> multiple > >>>>>>>>>>>> queries) > >>>>>>>>>>>>>>> which reference cachedTableA should not need to reproduce > `a` > >>>>>>> but > >>>>>>>>>>>>>> directly > >>>>>>>>>>>>>>> consume the intermediate result. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Conceptually one could think of cache() as introducing a > >>> caching > >>>>>>>>>>>> operator > >>>>>>>>>>>>>>> from which you need to consume from if you want to benefit > >>> from > >>>>>>>> the > >>>>>>>>>>>>>> caching > >>>>>>>>>>>>>>> functionality. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I agree, ideally the optimizer makes this kind of decision > >>> which > >>>>>>>>>>>>>>> intermediate result should be cached. But especially when > >>>>>>>> executing > >>>>>>>>>>>>>> ad-hoc > >>>>>>>>>>>>>>> queries the user might better know which results need to be > >>>>>>> cached > >>>>>>>>>>>>>> because > >>>>>>>>>>>>>>> Flink might not see the full DAG. In that sense, I would > >>>>>>> consider > >>>>>>>>> the > >>>>>>>>>>>>>>> cache() method as a hint for the optimizer. Of course, in > the > >>>>>>>> future > >>>>>>>>>>>> we > >>>>>>>>>>>>>>> might add functionality which tries to automatically cache > >>>>>>> results > >>>>>>>>>>>> (e.g. > >>>>>>>>>>>>>>> caching the latest intermediate results until so and so > much > >>>>>>> space > >>>>>>>>> is > >>>>>>>>>>>>>>> used). But this should hopefully not contradict with > >>>>>>> `CachedTable > >>>>>>>>>>>>>> cache()`. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Till > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 2:33 PM Becket Qin < > >>> becket....@gmail.com > >>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Till, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the clarification. I am still a little > confused. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> If cache() returns a CachedTable, the example might > become: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> b = a.map(...) > >>>>>>>>>>>>>>>> c = a.map(...) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> cachedTableA = a.cache() > >>>>>>>>>>>>>>>> d = cachedTableA.map(...) > >>>>>>>>>>>>>>>> e = a.map() > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> In the above case, if cache() is lazily evaluated, b, c, d > >>> and > >>>>>>> e > >>>>>>>>> are > >>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>> going to be reading from the original DAG that generates > a. > >>> But > >>>>>>>>> with > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>> naive expectation, d should be reading from the cache. > This > >>>>>>> seems > >>>>>>>>> not > >>>>>>>>>>>>>>>> solving the potential confusion you raised, right? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Just to be clear, my understanding are all based on the > >>>>>>>> assumption > >>>>>>>>>>>> that > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> tables are immutable. Therefore, after a.cache(), a the > >>>>>>>>>>>> c*achedTableA* > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> original table *a * should be completely interchangeable. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> That said, I think a valid argument is optimization. There > >>> are > >>>>>>>>> indeed > >>>>>>>>>>>>>>> cases > >>>>>>>>>>>>>>>> that reading from the original DAG could be faster than > >>> reading > >>>>>>>>> from > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>> cache. For example, in the following example: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> a.filter(f1' > 100) > >>>>>>>>>>>>>>>> a.cache() > >>>>>>>>>>>>>>>> b = a.filter(f1' < 100) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Ideally the optimizer should be intelligent enough to > decide > >>>>>>>> which > >>>>>>>>>>>> way > >>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>> faster, without user intervention. In this case, it will > >>>>>>> identify > >>>>>>>>>>>> that > >>>>>>>>>>>>>> b > >>>>>>>>>>>>>>>> would just be an empty table, thus skip reading from the > >>> cache > >>>>>>>>>>>>>>> completely. > >>>>>>>>>>>>>>>> But I agree that returning a CachedTable would give user > the > >>>>>>>>> control > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>> when to use cache, even though I still feel that letting > the > >>>>>>>>>>>> optimizer > >>>>>>>>>>>>>>>> handle this is a better option in long run. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, Dec 4, 2018 at 6:51 PM Till Rohrmann < > >>>>>>>> trohrm...@apache.org > >>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Yes you are right Becket that it still depends on the > >>> actual > >>>>>>>>>>>>>> execution > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> the job whether a consumer reads from a cached result or > >>> not. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> My point was actually about the properties of a (cached > vs. > >>>>>>>>>>>>>> non-cached) > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> not about the execution. I would not make cache trigger > the > >>>>>>>>>>>> execution > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> the job because one loses some flexibility by eagerly > >>>>>>> triggering > >>>>>>>>> the > >>>>>>>>>>>>>>>>> execution. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I tried to argue for an explicit CachedTable which is > >>> returned > >>>>>>>> by > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> cache() method like Piotr did in order to make the API > more > >>>>>>>>>>>> explicit. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Till > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 4:23 PM Becket Qin < > >>>>>>> becket....@gmail.com > >>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Till, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> That is a good example. Just a minor correction, in this > >>>>>>> case, > >>>>>>>>> b, c > >>>>>>>>>>>>>>>> and d > >>>>>>>>>>>>>>>>>> will all consume from a non-cached a. This is because > >>> cache > >>>>>>>> will > >>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> created on the very first job submission that generates > >>> the > >>>>>>>> table > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> cached. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If I understand correctly, this is example is about > >>> whether > >>>>>>>>>>>>>> .cache() > >>>>>>>>>>>>>>>>> method > >>>>>>>>>>>>>>>>>> should be eagerly evaluated or lazily evaluated. In > >>> another > >>>>>>>> word, > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>> cache() method actually triggers a job that creates the > >>>>>>> cache, > >>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>> be no such confusion. Is that right? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> In the example, although d will not consume from the > >>> cached > >>>>>>>> Table > >>>>>>>>>>>>>>> while > >>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>> looks supposed to, from correctness perspective the code > >>> will > >>>>>>>>> still > >>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>> correct result, assuming that tables are immutable. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Personally I feel it is OK because users probably won't > >>>>>>> really > >>>>>>>>>>>>>> worry > >>>>>>>>>>>>>>>>> about > >>>>>>>>>>>>>>>>>> whether the table is cached or not. And lazy cache could > >>>>>>> avoid > >>>>>>>>> some > >>>>>>>>>>>>>>>>>> unnecessary caching if a cached table is never created > in > >>> the > >>>>>>>>> user > >>>>>>>>>>>>>>>>>> application. But I am not opposed to do eager evaluation > >>> of > >>>>>>>>> cache. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann < > >>>>>>>>>>>>>> trohrm...@apache.org> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Another argument for Piotr's point is that lazily > >>> changing > >>>>>>>>>>>>>>> properties > >>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> node affects all down stream consumers but does not > >>>>>>>> necessarily > >>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>> happen before these consumers are defined. From a > user's > >>>>>>>>>>>>>>> perspective > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> can be quite confusing: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> b = a.map(...) > >>>>>>>>>>>>>>>>>>> c = a.map(...) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> a.cache() > >>>>>>>>>>>>>>>>>>> d = a.map(...) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> now b, c and d will consume from a cached operator. In > >>> this > >>>>>>>>> case, > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>>> would most likely expect that only d reads from a > cached > >>>>>>>> result. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>> Till > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski < > >>>>>>>>>>>>>>>>> pi...@data-artisans.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hey Shaoxuan and Becket, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side > >>> effects? > >>>>>>> So > >>>>>>>>>>>>>>> far > >>>>>>>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist > if a > >>>>>>>> table > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> mutable. > >>>>>>>>>>>>>>>>>>>>> Is that the case? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Not only that. There are also performance implications > >>> and > >>>>>>>>>>>>>> those > >>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>> another implicit side effects of using `void cache()`. > >>> As I > >>>>>>>>>>>>>> wrote > >>>>>>>>>>>>>>>>>> before, > >>>>>>>>>>>>>>>>>>>> reading from cache might not always be desirable, thus > >>> it > >>>>>>> can > >>>>>>>>>>>>>>> cause > >>>>>>>>>>>>>>>>>>>> performance degradation and I’m fine with that - > user's > >>> or > >>>>>>>>>>>>>>>>> optimiser’s > >>>>>>>>>>>>>>>>>>>> choice. What I do not like is that this implicit side > >>>>>>> effect > >>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>> manifest > >>>>>>>>>>>>>>>>>>>> in completely different part of code, that wasn’t > >>> touched > >>>>>>> by > >>>>>>>> a > >>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>> while > >>>>>>>>>>>>>>>>>>>> he was adding `void cache()` call somewhere else. And > >>> even > >>>>>>> if > >>>>>>>>>>>>>>>> caching > >>>>>>>>>>>>>>>>>>>> improves performance, it’s still a side effect of > `void > >>>>>>>>>>>>>> cache()`. > >>>>>>>>>>>>>>>>>> Almost > >>>>>>>>>>>>>>>>>>>> from the definition `void` methods have only side > >>> effects. > >>>>>>>> As I > >>>>>>>>>>>>>>>> wrote > >>>>>>>>>>>>>>>>>>>> before, there are couple of scenarios where this might > >>> be > >>>>>>>>>>>>>>>> undesirable > >>>>>>>>>>>>>>>>>>>> and/or unexpected, for example: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 1. > >>>>>>>>>>>>>>>>>>>> Table b = …; > >>>>>>>>>>>>>>>>>>>> b.cache() > >>>>>>>>>>>>>>>>>>>> x = b.join(…) > >>>>>>>>>>>>>>>>>>>> y = b.count() > >>>>>>>>>>>>>>>>>>>> // ... > >>>>>>>>>>>>>>>>>>>> // 100 > >>>>>>>>>>>>>>>>>>>> // hundred > >>>>>>>>>>>>>>>>>>>> // lines > >>>>>>>>>>>>>>>>>>>> // of > >>>>>>>>>>>>>>>>>>>> // code > >>>>>>>>>>>>>>>>>>>> // later > >>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) // this might be even > hidden > >>> in > >>>>>>> a > >>>>>>>>>>>>>>>>> different > >>>>>>>>>>>>>>>>>>>> method/file/package/dependency > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 2. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Table b = ... > >>>>>>>>>>>>>>>>>>>> If (some_condition) { > >>>>>>>>>>>>>>>>>>>> foo(b) > >>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>>> Else { > >>>>>>>>>>>>>>>>>>>> bar(b) > >>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>>> z = b.filter(…).groupBy(…) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Void foo(Table b) { > >>>>>>>>>>>>>>>>>>>> b.cache() > >>>>>>>>>>>>>>>>>>>> // do something with b > >>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> In both above examples, `b.cache()` will implicitly > >>> affect > >>>>>>>>>>>>>>>> (semantic > >>>>>>>>>>>>>>>>>> of a > >>>>>>>>>>>>>>>>>>>> program in case of sources being mutable and > >>> performance) > >>>>>>> `z > >>>>>>>> = > >>>>>>>>>>>>>>>>>>>> b.filter(…).groupBy(…)` which might be far from > obvious. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On top of that, there is still this argument of mine > >>> that > >>>>>>>>>>>>>> having > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> `MaterializedTable` or `CachedTable` handle is more > >>>>>>> flexible > >>>>>>>>>>>>>> for > >>>>>>>>>>>>>>> us > >>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> future and for the user (as a manual option to bypass > >>> cache > >>>>>>>>>>>>>>> reads). > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> But Jiangjie is correct, > >>>>>>>>>>>>>>>>>>>>> the source table in batching should be immutable. It > is > >>>>>>> the > >>>>>>>>>>>>>>>> user’s > >>>>>>>>>>>>>>>>>>>>> responsibility to ensure it, otherwise even a regular > >>>>>>>>>>>>>> failover > >>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>> lead > >>>>>>>>>>>>>>>>>>>>> to inconsistent results. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Yes, I agree that’s what perfect world/good deployment > >>>>>>> should > >>>>>>>>>>>>>> be. > >>>>>>>>>>>>>>>> But > >>>>>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>>>>>>> often isn’t and while I’m not trying to fix this > (since > >>> the > >>>>>>>>>>>>>>> proper > >>>>>>>>>>>>>>>>> fix > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> to support transactions), I’m just trying to minimise > >>>>>>>> confusion > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> users that are not fully aware what’s going on and > >>> operate > >>>>>>> in > >>>>>>>>>>>>>>> less > >>>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>>> perfect setup. And if something bites them after > adding > >>>>>>>>>>>>>>> `b.cache()` > >>>>>>>>>>>>>>>>>> call, > >>>>>>>>>>>>>>>>>>>> to make sure that they at least know all of the places > >>> that > >>>>>>>>>>>>>>> adding > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>> line can affect. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks, Piotrek > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 1 Dec 2018, at 15:39, Becket Qin < > >>> becket....@gmail.com > >>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks again for the clarification. Some more replies > >>> are > >>>>>>>>>>>>>>>>> following. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> But keep in mind that `.cache()` will/might not only > be > >>>>>>> used > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> interactive > >>>>>>>>>>>>>>>>>>>>>> programming and not only in batching. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> It is true. Actually in stream processing, cache() > has > >>> the > >>>>>>>>>>>>>> same > >>>>>>>>>>>>>>>>>>> semantic > >>>>>>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>>> batch processing. The semantic is following: > >>>>>>>>>>>>>>>>>>>>> For a table created via a series of computation, save > >>> that > >>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>>>>>> reference to avoid running the computation logic to > >>>>>>>>>>>>>> regenerate > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> table. > >>>>>>>>>>>>>>>>>>>>> Once the application exits, drop all the cache. > >>>>>>>>>>>>>>>>>>>>> This semantic is same for both batch and stream > >>>>>>> processing. > >>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>> difference > >>>>>>>>>>>>>>>>>>>>> is that stream applications will only run once as > they > >>> are > >>>>>>>>>>>>>> long > >>>>>>>>>>>>>>>>>>> running. > >>>>>>>>>>>>>>>>>>>>> And the batch applications may be run multiple times, > >>>>>>> hence > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> cache > >>>>>>>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>>>>>>> be created and dropped each time the application > runs. > >>>>>>>>>>>>>>>>>>>>> Admittedly, there will probably be some resource > >>>>>>> management > >>>>>>>>>>>>>>>>>>> requirements > >>>>>>>>>>>>>>>>>>>>> for the streaming cached table, such as time based / > >>> size > >>>>>>>>>>>>>> based > >>>>>>>>>>>>>>>>>>>> retention, > >>>>>>>>>>>>>>>>>>>>> to address the infinite data issue. But such > >>> requirement > >>>>>>>> does > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>>>> the semantic. > >>>>>>>>>>>>>>>>>>>>> You are right that interactive programming is just > one > >>> use > >>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> cache(). > >>>>>>>>>>>>>>>>>>>>> It is not the only use case. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having the > >>> `void > >>>>>>>>>>>>>>>> cache()` > >>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>> side effects. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> This is indeed the key point. The argument around > >>> whether > >>>>>>>>>>>>>>> cache() > >>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>> return something already indicates that cache() and > >>>>>>>>>>>>>>> materialize() > >>>>>>>>>>>>>>>>>>> address > >>>>>>>>>>>>>>>>>>>>> different issues. > >>>>>>>>>>>>>>>>>>>>> Can you explain a bit more one what are the side > >>> effects? > >>>>>>> So > >>>>>>>>>>>>>>> far > >>>>>>>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>>> understanding is that such side effects only exist > if a > >>>>>>>> table > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> mutable. > >>>>>>>>>>>>>>>>>>>>> Is that the case? > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make > >>>>>>> CachedTable > >>>>>>>>>>>>>>>>>> read-only. > >>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that user > >>> can > >>>>>>>> not > >>>>>>>>>>>>>>>> write > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> views > >>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user currently > >>> can > >>>>>>> not > >>>>>>>>>>>>>>>> write > >>>>>>>>>>>>>>>>>> to a > >>>>>>>>>>>>>>>>>>>>>> Table. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I don't think anyone should insert something to a > >>> cache. > >>>>>>> By > >>>>>>>>>>>>>>>>>> definition > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> cache should only be updated when the corresponding > >>>>>>> original > >>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>> updated. What I am wondering is that given the > >>> following > >>>>>>> two > >>>>>>>>>>>>>>>> facts: > >>>>>>>>>>>>>>>>>>>>> 1. If and only if a table is mutable (with something > >>> like > >>>>>>>>>>>>>>>>> insert()), > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>> CachedTable may have implicit behavior. > >>>>>>>>>>>>>>>>>>>>> 2. A CachedTable extends a Table. > >>>>>>>>>>>>>>>>>>>>> We can come to the conclusion that a CachedTable is > >>>>>>> mutable > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>> insert into the CachedTable directly. This is where I > >>>>>>>> thought > >>>>>>>>>>>>>>>>>>> confusing. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski < > >>>>>>>>>>>>>>>>>> pi...@data-artisans.com > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Regarding naming `cache()` vs `materialize()`. One > >>> more > >>>>>>>>>>>>>>>>> explanation > >>>>>>>>>>>>>>>>>>> why > >>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>> think `materialize()` is more natural to me is that > I > >>>>>>> think > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>> “Table”s > >>>>>>>>>>>>>>>>>>>>>> in Table-API as views. They behave the same way as > SQL > >>>>>>>>>>>>>> views, > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>> difference for me is that their live scope is short > - > >>>>>>>>>>>>>> current > >>>>>>>>>>>>>>>>>> session > >>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>> is limited by different execution model. That’s why > >>>>>>>>>>>>>> “cashing” > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>> view > >>>>>>>>>>>>>>>>>>>> for me > >>>>>>>>>>>>>>>>>>>>>> is just materialising it. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> However I see and I understand your point of view. > >>> Coming > >>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>>>>>> DataSet/DataStream and generally speaking non-SQL > >>> world, > >>>>>>>>>>>>>>>> `cache()` > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>> natural. But keep in mind that `.cache()` will/might > >>> not > >>>>>>>>>>>>>> only > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>> interactive programming and not only in batching. > But > >>>>>>>> naming > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>> issue, > >>>>>>>>>>>>>>>>>>>>>> and not that critical to me. Especially that once we > >>>>>>>>>>>>>> implement > >>>>>>>>>>>>>>>>>> proper > >>>>>>>>>>>>>>>>>>>>>> materialised views, we can always deprecate/rename > >>>>>>>> `cache()` > >>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>> deem > >>>>>>>>>>>>>>>>>>>> so. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For me the more important issue is of not having the > >>>>>>> `void > >>>>>>>>>>>>>>>>> cache()` > >>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>> side effects. Exactly for the reasons that you have > >>>>>>>>>>>>>> mentioned. > >>>>>>>>>>>>>>>>> True: > >>>>>>>>>>>>>>>>>>>>>> results might be non deterministic if underlying > >>> source > >>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>> changing. > >>>>>>>>>>>>>>>>>>>>>> Problem is that `void cache()` implicitly changes > the > >>>>>>>>>>>>>> semantic > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> subsequent uses of the cached/materialized Table. It > >>> can > >>>>>>>>>>>>>> cause > >>>>>>>>>>>>>>>>> “wtf” > >>>>>>>>>>>>>>>>>>>> moment > >>>>>>>>>>>>>>>>>>>>>> for a user if he inserts “b.cache()” call in some > >>> place > >>>>>>> in > >>>>>>>>>>>>>> his > >>>>>>>>>>>>>>>>> code > >>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>> suddenly some other random places are behaving > >>>>>>> differently. > >>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>>>>> `materialize()` or `cache()` returns a Table handle, > >>> we > >>>>>>>>>>>>>> force > >>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>> explicitly use the cache which removes the “random” > >>> part > >>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> "suddenly > >>>>>>>>>>>>>>>>>>>>>> some other random places are behaving differently”. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> This argument and others that I’ve raised (greater > >>>>>>>>>>>>>>>>>>> flexibility/allowing > >>>>>>>>>>>>>>>>>>>>>> user to explicitly bypass the cache) are independent > >>> of > >>>>>>>>>>>>>>>> `cache()` > >>>>>>>>>>>>>>>>> vs > >>>>>>>>>>>>>>>>>>>>>> `materialize()` discussion. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Does that mean one can also insert into the > >>> CachedTable? > >>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>> sounds > >>>>>>>>>>>>>>>>>>>>>> pretty confusing. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I don’t know, probably initially we should make > >>>>>>> CachedTable > >>>>>>>>>>>>>>>>>>> read-only. I > >>>>>>>>>>>>>>>>>>>>>> don’t find it more confusing than the fact that user > >>> can > >>>>>>>> not > >>>>>>>>>>>>>>>> write > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> views > >>>>>>>>>>>>>>>>>>>>>> or materialised views in SQL or that user currently > >>> can > >>>>>>> not > >>>>>>>>>>>>>>>> write > >>>>>>>>>>>>>>>>>> to a > >>>>>>>>>>>>>>>>>>>>>> Table. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On 30 Nov 2018, at 17:38, Xingcan Cui < > >>>>>>> xingc...@gmail.com > >>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> I agree with @Becket that `cache()` and > >>> `materialize()` > >>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>> considered as two different methods where the later > >>> one > >>>>>>> is > >>>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>>>>>>> sophisticated. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> According to my understanding, the initial idea is > >>> just > >>>>>>> to > >>>>>>>>>>>>>>>>>> introduce > >>>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>> simple cache or persist mechanism, but as the > TableAPI > >>>>>>> is a > >>>>>>>>>>>>>>>>>> high-level > >>>>>>>>>>>>>>>>>>>> API, > >>>>>>>>>>>>>>>>>>>>>> it’s naturally for as to think in a SQL way. > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Maybe we can add the `cache()` method to the > DataSet > >>> API > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> force > >>>>>>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>>>>>>>> to translate a Table to a Dataset before caching it. > >>> Then > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>> manually register the cached dataset to a table > again > >>> (we > >>>>>>>>>>>>>> may > >>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>> table replacement mechanisms for datasets with an > >>>>>>> identical > >>>>>>>>>>>>>>>> schema > >>>>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>> different contents here). After all, it’s the > dataset > >>>>>>>> rather > >>>>>>>>>>>>>>>> than > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> dynamic table that need to be cached, right? > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>> Xingcan > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On Nov 30, 2018, at 10:57 AM, Becket Qin < > >>>>>>>>>>>>>>>> becket....@gmail.com> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Hi Piotrek and Jark, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback and explanation. Those are > >>> good > >>>>>>>>>>>>>>>>> arguments. > >>>>>>>>>>>>>>>>>>>> But I > >>>>>>>>>>>>>>>>>>>>>>>> think those arguments are mostly about > materialized > >>>>>>> view. > >>>>>>>>>>>>>>> Let > >>>>>>>>>>>>>>>> me > >>>>>>>>>>>>>>>>>> try > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> explain the reason I believe cache() and > >>> materialize() > >>>>>>>> are > >>>>>>>>>>>>>>>>>>> different. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I think cache() and materialize() have quite > >>> different > >>>>>>>>>>>>>>>>>> implications. > >>>>>>>>>>>>>>>>>>>> An > >>>>>>>>>>>>>>>>>>>>>>>> analogy I can think of is save()/publish(). When > >>> users > >>>>>>>>>>>>>> call > >>>>>>>>>>>>>>>>>> cache(), > >>>>>>>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> just like they are saving an intermediate result > as > >>> a > >>>>>>>>>>>>>> draft > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> their > >>>>>>>>>>>>>>>>>>>>>> work, > >>>>>>>>>>>>>>>>>>>>>>>> this intermediate result may not have any > realistic > >>>>>>>>>>>>>> meaning. > >>>>>>>>>>>>>>>>>> Calling > >>>>>>>>>>>>>>>>>>>>>>>> cache() does not mean users want to publish the > >>> cached > >>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>>>>>> manner. > >>>>>>>>>>>>>>>>>>>>>>>> But when users call materialize(), that means "I > >>> have > >>>>>>>>>>>>>>>> something > >>>>>>>>>>>>>>>>>>>>>> meaningful > >>>>>>>>>>>>>>>>>>>>>>>> to be reused by others", now users need to think > >>> about > >>>>>>>> the > >>>>>>>>>>>>>>>>>>> validation, > >>>>>>>>>>>>>>>>>>>>>>>> update & versioning, lifecycle of the result, etc. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Piotrek's suggestions on variations of the > >>>>>>> materialize() > >>>>>>>>>>>>>>>> methods > >>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>> very > >>>>>>>>>>>>>>>>>>>>>>>> useful. It would be great if Flink have them. The > >>>>>>> concept > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>> materialized > >>>>>>>>>>>>>>>>>>>>>>>> view is actually a pretty big feature, not to say > >>> the > >>>>>>>>>>>>>>> related > >>>>>>>>>>>>>>>>>> stuff > >>>>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>>>>>>>>> triggers/hooks you mentioned earlier. I think the > >>>>>>>>>>>>>>> materialized > >>>>>>>>>>>>>>>>>> view > >>>>>>>>>>>>>>>>>>>>>> itself > >>>>>>>>>>>>>>>>>>>>>>>> should be discussed in a more thorough and > >>> systematic > >>>>>>>>>>>>>>> manner. > >>>>>>>>>>>>>>>>> And > >>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>> found > >>>>>>>>>>>>>>>>>>>>>>>> that discussion is kind of orthogonal and way > beyond > >>>>>>>>>>>>>>>> interactive > >>>>>>>>>>>>>>>>>>>>>>>> programming experience. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> The example you gave was interesting. I still have > >>> some > >>>>>>>>>>>>>>>>> questions, > >>>>>>>>>>>>>>>>>>>>>> though. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Table source = … // some source that scans files > >>> from a > >>>>>>>>>>>>>>>>> directory > >>>>>>>>>>>>>>>>>>>>>>>>> “/foo/bar/“ > >>>>>>>>>>>>>>>>>>>>>>>>> Table t1 = source.groupBy(…).select(…).where(…) > ….; > >>>>>>>>>>>>>>>>>>>>>>>>> Table t2 = t1.materialize() // (or `cache()`) > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> t2.count() // initialise cache (if it’s lazily > >>>>>>>>>>>>>> initialised) > >>>>>>>>>>>>>>>>>>>>>>>>> int a1 = t1.count() > >>>>>>>>>>>>>>>>>>>>>>>>> int b1 = t2.count() > >>>>>>>>>>>>>>>>>>>>>>>>> // something in the background (or we trigger it) > >>>>>>> writes > >>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>>>> files > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>> /foo/bar > >>>>>>>>>>>>>>>>>>>>>>>>> int a2 = t1.count() > >>>>>>>>>>>>>>>>>>>>>>>>> int b2 = t2.count() > >>>>>>>>>>>>>>>>>>>>>>>>> t2.refresh() // possible future extension, not to > >>> be > >>>>>>>>>>>>>>>>> implemented > >>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>> initial version > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> what if someone else added some more files to > >>> /foo/bar > >>>>>>> at > >>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>> point? > >>>>>>>>>>>>>>>>>>>> In > >>>>>>>>>>>>>>>>>>>>>>>> that case, a3 won't equals to b3, and the result > >>> become > >>>>>>>>>>>>>>>>>>>>>> non-deterministic, > >>>>>>>>>>>>>>>>>>>>>>>> right? > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> int a3 = t1.count() > >>>>>>>>>>>>>>>>>>>>>>>>> int b3 = t2.count() > >>>>>>>>>>>>>>>>>>>>>>>>> t2.drop() // another possible future extension, > >>> manual > >>>>>>>>>>>>>>>> “cache” > >>>>>>>>>>>>>>>>>>>> dropping > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> When we talk about interactive programming, in > most > >>>>>>>> cases, > >>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>> talking > >>>>>>>>>>>>>>>>>>>>>>>> about batch applications. A fundamental assumption > >>> of > >>>>>>>> such > >>>>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> source data is complete before the data processing > >>>>>>>> begins, > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>>>>>>>>>> will not change during the data processing. IMO, > if > >>>>>>>>>>>>>>> additional > >>>>>>>>>>>>>>>>>> rows > >>>>>>>>>>>>>>>>>>>>>> needs > >>>>>>>>>>>>>>>>>>>>>>>> to be added to some source during the processing, > it > >>>>>>>>>>>>>> should > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> done > >>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>> ways > >>>>>>>>>>>>>>>>>>>>>>>> like union the source with another table > containing > >>> the > >>>>>>>>>>>>>> rows > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>> added. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> There are a few cases that computations are > executed > >>>>>>>>>>>>>>>> repeatedly > >>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> changing data source. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> For example, people may run a ML training job > every > >>>>>>> hour > >>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> samples > >>>>>>>>>>>>>>>>>>>>>>>> newly added in the past hour. In that case, the > >>> source > >>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>>>> between > >>>>>>>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>>>>>>> indeed change. But still, the data remain > unchanged > >>>>>>>> within > >>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>> run. > >>>>>>>>>>>>>>>>>>>> And > >>>>>>>>>>>>>>>>>>>>>>>> usually in that case, the result will need > >>> versioning, > >>>>>>>>>>>>>> i.e. > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> given > >>>>>>>>>>>>>>>>>>>>>>>> result, it tells that the result is a result from > >>> the > >>>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>>>>> by a > >>>>>>>>>>>>>>>>>>>>>>>> certain timestamp. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Another example is something like data warehouse. > In > >>>>>>> this > >>>>>>>>>>>>>>>> case, > >>>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>>> are a > >>>>>>>>>>>>>>>>>>>>>>>> few source of original/raw data. On top of those > >>>>>>> sources, > >>>>>>>>>>>>>>> many > >>>>>>>>>>>>>>>>>>>>>> materialized > >>>>>>>>>>>>>>>>>>>>>>>> view / queries / reports / dashboards can be > >>> created to > >>>>>>>>>>>>>>>> generate > >>>>>>>>>>>>>>>>>>>> derived > >>>>>>>>>>>>>>>>>>>>>>>> data. Those derived data needs to be updated when > >>> the > >>>>>>>>>>>>>>>> underlying > >>>>>>>>>>>>>>>>>>>>>> original > >>>>>>>>>>>>>>>>>>>>>>>> data changes. In that case, the processing logic > >>> that > >>>>>>>>>>>>>>> derives > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> original > >>>>>>>>>>>>>>>>>>>>>>>> data needs to be executed repeatedly to update > those > >>>>>>>>>>>>>>>>>> reports/views. > >>>>>>>>>>>>>>>>>>>>>> Again, > >>>>>>>>>>>>>>>>>>>>>>>> all those derived data also need to ha > >>> > >>> > > >