Hi Till, That is a good example. Just a minor correction, in this case, b, c and d will all consume from a non-cached a. This is because cache will only be created on the very first job submission that generates the table to be cached.
If I understand correctly, this is example is about whether .cache() method should be eagerly evaluated or lazily evaluated. In another word, if cache() method actually triggers a job that creates the cache, there will be no such confusion. Is that right? In the example, although d will not consume from the cached Table while it looks supposed to, from correctness perspective the code will still return correct result, assuming that tables are immutable. Personally I feel it is OK because users probably won't really worry about whether the table is cached or not. And lazy cache could avoid some unnecessary caching if a cached table is never created in the user application. But I am not opposed to do eager evaluation of cache. Thanks, Jiangjie (Becket) Qin On Mon, Dec 3, 2018 at 10:01 PM Till Rohrmann <trohrm...@apache.org> wrote: > Another argument for Piotr's point is that lazily changing properties of a > node affects all down stream consumers but does not necessarily have to > happen before these consumers are defined. From a user's perspective this > can be quite confusing: > > b = a.map(...) > c = a.map(...) > > a.cache() > d = a.map(...) > > now b, c and d will consume from a cached operator. In this case, the user > would most likely expect that only d reads from a cached result. > > Cheers, > Till > > On Mon, Dec 3, 2018 at 11:32 AM Piotr Nowojski <pi...@data-artisans.com> > wrote: > > > Hey Shaoxuan and Becket, > > > > > Can you explain a bit more one what are the side effects? So far my > > > understanding is that such side effects only exist if a table is > mutable. > > > Is that the case? > > > > Not only that. There are also performance implications and those are > > another implicit side effects of using `void cache()`. As I wrote before, > > reading from cache might not always be desirable, thus it can cause > > performance degradation and I’m fine with that - user's or optimiser’s > > choice. What I do not like is that this implicit side effect can manifest > > in completely different part of code, that wasn’t touched by a user while > > he was adding `void cache()` call somewhere else. And even if caching > > improves performance, it’s still a side effect of `void cache()`. Almost > > from the definition `void` methods have only side effects. As I wrote > > before, there are couple of scenarios where this might be undesirable > > and/or unexpected, for example: > > > > 1. > > Table b = …; > > b.cache() > > x = b.join(…) > > y = b.count() > > // ... > > // 100 > > // hundred > > // lines > > // of > > // code > > // later > > z = b.filter(…).groupBy(…) // this might be even hidden in a different > > method/file/package/dependency > > > > 2. > > > > Table b = ... > > If (some_condition) { > > foo(b) > > } > > Else { > > bar(b) > > } > > z = b.filter(…).groupBy(…) > > > > > > Void foo(Table b) { > > b.cache() > > // do something with b > > } > > > > In both above examples, `b.cache()` will implicitly affect (semantic of a > > program in case of sources being mutable and performance) `z = > > b.filter(…).groupBy(…)` which might be far from obvious. > > > > On top of that, there is still this argument of mine that having a > > `MaterializedTable` or `CachedTable` handle is more flexible for us for > the > > future and for the user (as a manual option to bypass cache reads). > > > > > But Jiangjie is correct, > > > the source table in batching should be immutable. It is the user’s > > > responsibility to ensure it, otherwise even a regular failover may lead > > > to inconsistent results. > > > > Yes, I agree that’s what perfect world/good deployment should be. But its > > often isn’t and while I’m not trying to fix this (since the proper fix is > > to support transactions), I’m just trying to minimise confusion for the > > users that are not fully aware what’s going on and operate in less then > > perfect setup. And if something bites them after adding `b.cache()` call, > > to make sure that they at least know all of the places that adding this > > line can affect. > > > > Thanks, Piotrek > > > > > On 1 Dec 2018, at 15:39, Becket Qin <becket....@gmail.com> wrote: > > > > > > Hi Piotrek, > > > > > > Thanks again for the clarification. Some more replies are following. > > > > > > But keep in mind that `.cache()` will/might not only be used in > > interactive > > >> programming and not only in batching. > > > > > > It is true. Actually in stream processing, cache() has the same > semantic > > as > > > batch processing. The semantic is following: > > > For a table created via a series of computation, save that table for > > later > > > reference to avoid running the computation logic to regenerate the > table. > > > Once the application exits, drop all the cache. > > > This semantic is same for both batch and stream processing. The > > difference > > > is that stream applications will only run once as they are long > running. > > > And the batch applications may be run multiple times, hence the cache > may > > > be created and dropped each time the application runs. > > > Admittedly, there will probably be some resource management > requirements > > > for the streaming cached table, such as time based / size based > > retention, > > > to address the infinite data issue. But such requirement does not > change > > > the semantic. > > > You are right that interactive programming is just one use case of > > cache(). > > > It is not the only use case. > > > > > > For me the more important issue is of not having the `void cache()` > with > > >> side effects. > > > > > > This is indeed the key point. The argument around whether cache() > should > > > return something already indicates that cache() and materialize() > address > > > different issues. > > > Can you explain a bit more one what are the side effects? So far my > > > understanding is that such side effects only exist if a table is > mutable. > > > Is that the case? > > > > > > I don’t know, probably initially we should make CachedTable read-only. > I > > >> don’t find it more confusing than the fact that user can not write to > > views > > >> or materialised views in SQL or that user currently can not write to a > > >> Table. > > > > > > I don't think anyone should insert something to a cache. By definition > > the > > > cache should only be updated when the corresponding original table is > > > updated. What I am wondering is that given the following two facts: > > > 1. If and only if a table is mutable (with something like insert()), a > > > CachedTable may have implicit behavior. > > > 2. A CachedTable extends a Table. > > > We can come to the conclusion that a CachedTable is mutable and users > can > > > insert into the CachedTable directly. This is where I thought > confusing. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Sat, Dec 1, 2018 at 2:45 AM Piotr Nowojski <pi...@data-artisans.com > > > > > wrote: > > > > > >> Hi all, > > >> > > >> Regarding naming `cache()` vs `materialize()`. One more explanation > why > > I > > >> think `materialize()` is more natural to me is that I think of all > > “Table”s > > >> in Table-API as views. They behave the same way as SQL views, the only > > >> difference for me is that their live scope is short - current session > > which > > >> is limited by different execution model. That’s why “cashing” a view > > for me > > >> is just materialising it. > > >> > > >> However I see and I understand your point of view. Coming from > > >> DataSet/DataStream and generally speaking non-SQL world, `cache()` is > > more > > >> natural. But keep in mind that `.cache()` will/might not only be used > in > > >> interactive programming and not only in batching. But naming is one > > issue, > > >> and not that critical to me. Especially that once we implement proper > > >> materialised views, we can always deprecate/rename `cache()` if we > deem > > so. > > >> > > >> > > >> For me the more important issue is of not having the `void cache()` > with > > >> side effects. Exactly for the reasons that you have mentioned. True: > > >> results might be non deterministic if underlying source table are > > changing. > > >> Problem is that `void cache()` implicitly changes the semantic of > > >> subsequent uses of the cached/materialized Table. It can cause “wtf” > > moment > > >> for a user if he inserts “b.cache()” call in some place in his code > and > > >> suddenly some other random places are behaving differently. If > > >> `materialize()` or `cache()` returns a Table handle, we force user to > > >> explicitly use the cache which removes the “random” part from the > > "suddenly > > >> some other random places are behaving differently”. > > >> > > >> This argument and others that I’ve raised (greater > flexibility/allowing > > >> user to explicitly bypass the cache) are independent of `cache()` vs > > >> `materialize()` discussion. > > >> > > >>> Does that mean one can also insert into the CachedTable? This sounds > > >> pretty confusing. > > >> > > >> I don’t know, probably initially we should make CachedTable > read-only. I > > >> don’t find it more confusing than the fact that user can not write to > > views > > >> or materialised views in SQL or that user currently can not write to a > > >> Table. > > >> > > >> Piotrek > > >> > > >>> On 30 Nov 2018, at 17:38, Xingcan Cui <xingc...@gmail.com> wrote: > > >>> > > >>> Hi all, > > >>> > > >>> I agree with @Becket that `cache()` and `materialize()` should be > > >> considered as two different methods where the later one is more > > >> sophisticated. > > >>> > > >>> According to my understanding, the initial idea is just to introduce > a > > >> simple cache or persist mechanism, but as the TableAPI is a high-level > > API, > > >> it’s naturally for as to think in a SQL way. > > >>> > > >>> Maybe we can add the `cache()` method to the DataSet API and force > > users > > >> to translate a Table to a Dataset before caching it. Then the users > > should > > >> manually register the cached dataset to a table again (we may need > some > > >> table replacement mechanisms for datasets with an identical schema but > > >> different contents here). After all, it’s the dataset rather than the > > >> dynamic table that need to be cached, right? > > >>> > > >>> Best, > > >>> Xingcan > > >>> > > >>>> On Nov 30, 2018, at 10:57 AM, Becket Qin <becket....@gmail.com> > > wrote: > > >>>> > > >>>> Hi Piotrek and Jark, > > >>>> > > >>>> Thanks for the feedback and explanation. Those are good arguments. > > But I > > >>>> think those arguments are mostly about materialized view. Let me try > > to > > >>>> explain the reason I believe cache() and materialize() are > different. > > >>>> > > >>>> I think cache() and materialize() have quite different implications. > > An > > >>>> analogy I can think of is save()/publish(). When users call cache(), > > it > > >> is > > >>>> just like they are saving an intermediate result as a draft of their > > >> work, > > >>>> this intermediate result may not have any realistic meaning. Calling > > >>>> cache() does not mean users want to publish the cached table in any > > >> manner. > > >>>> But when users call materialize(), that means "I have something > > >> meaningful > > >>>> to be reused by others", now users need to think about the > validation, > > >>>> update & versioning, lifecycle of the result, etc. > > >>>> > > >>>> Piotrek's suggestions on variations of the materialize() methods are > > >> very > > >>>> useful. It would be great if Flink have them. The concept of > > >> materialized > > >>>> view is actually a pretty big feature, not to say the related stuff > > like > > >>>> triggers/hooks you mentioned earlier. I think the materialized view > > >> itself > > >>>> should be discussed in a more thorough and systematic manner. And I > > >> found > > >>>> that discussion is kind of orthogonal and way beyond interactive > > >>>> programming experience. > > >>>> > > >>>> The example you gave was interesting. I still have some questions, > > >> though. > > >>>> > > >>>> Table source = … // some source that scans files from a directory > > >>>>> “/foo/bar/“ > > >>>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > > >>>>> Table t2 = t1.materialize() // (or `cache()`) > > >>>> > > >>>> t2.count() // initialise cache (if it’s lazily initialised) > > >>>>> int a1 = t1.count() > > >>>>> int b1 = t2.count() > > >>>>> // something in the background (or we trigger it) writes new files > to > > >>>>> /foo/bar > > >>>>> int a2 = t1.count() > > >>>>> int b2 = t2.count() > > >>>>> t2.refresh() // possible future extension, not to be implemented in > > the > > >>>>> initial version > > >>>>> > > >>>> > > >>>> what if someone else added some more files to /foo/bar at this > point? > > In > > >>>> that case, a3 won't equals to b3, and the result become > > >> non-deterministic, > > >>>> right? > > >>>> > > >>>> int a3 = t1.count() > > >>>>> int b3 = t2.count() > > >>>>> t2.drop() // another possible future extension, manual “cache” > > dropping > > >>>> > > >>>> > > >>>> When we talk about interactive programming, in most cases, we are > > >> talking > > >>>> about batch applications. A fundamental assumption of such case is > > that > > >> the > > >>>> source data is complete before the data processing begins, and the > > data > > >>>> will not change during the data processing. IMO, if additional rows > > >> needs > > >>>> to be added to some source during the processing, it should be done > in > > >> ways > > >>>> like union the source with another table containing the rows to be > > >> added. > > >>>> > > >>>> There are a few cases that computations are executed repeatedly on > the > > >>>> changing data source. > > >>>> > > >>>> For example, people may run a ML training job every hour with the > > >> samples > > >>>> newly added in the past hour. In that case, the source data between > > will > > >>>> indeed change. But still, the data remain unchanged within one run. > > And > > >>>> usually in that case, the result will need versioning, i.e. for a > > given > > >>>> result, it tells that the result is a result from the source data > by a > > >>>> certain timestamp. > > >>>> > > >>>> Another example is something like data warehouse. In this case, > there > > >> are a > > >>>> few source of original/raw data. On top of those sources, many > > >> materialized > > >>>> view / queries / reports / dashboards can be created to generate > > derived > > >>>> data. Those derived data needs to be updated when the underlying > > >> original > > >>>> data changes. In that case, the processing logic that derives the > > >> original > > >>>> data needs to be executed repeatedly to update those reports/views. > > >> Again, > > >>>> all those derived data also need to have version management, such as > > >>>> timestamp. > > >>>> > > >>>> In any of the above two cases, during a single run of the processing > > >> logic, > > >>>> the data cannot change. Otherwise the behavior of the processing > logic > > >> may > > >>>> be undefined. In the above two examples, when writing the processing > > >> logic, > > >>>> Users can use .cache() to hint Flink that those results should be > > saved > > >> to > > >>>> avoid repeated computation. And then for the result of my > application > > >>>> logic, I'll call materialize(), so that these results could be > managed > > >> by > > >>>> the system with versioning, metadata management, lifecycle > management, > > >>>> ACLs, etc. > > >>>> > > >>>> It is true we can use materialize() to do the cache() job, but I am > > >> really > > >>>> reluctant to shoehorn cache() into materialize() and force users to > > >> worry > > >>>> about a bunch of implications that they needn't have to. I am > > >> absolutely on > > >>>> your side that redundant API is bad. But it is equally frustrating, > if > > >> not > > >>>> more, that the same API does different things. > > >>>> > > >>>> Thanks, > > >>>> > > >>>> Jiangjie (Becket) Qin > > >>>> > > >>>> > > >>>> On Fri, Nov 30, 2018 at 10:34 PM Shaoxuan Wang <wshaox...@gmail.com > > > > >> wrote: > > >>>> > > >>>>> Thanks Piotrek, > > >>>>> You provided a very good example, it explains all the confusions I > > >> have. > > >>>>> It is clear that there is something we have not considered in the > > >> initial > > >>>>> proposal. We intend to force the user to reuse the > > cached/materialized > > >>>>> table, if its cache() method is executed. We did not expect that > user > > >> may > > >>>>> want to re-executed the plan from the source table. Let me re-think > > >> about > > >>>>> it and get back to you later. > > >>>>> > > >>>>> In the meanwhile, this example/observation also infers that we > cannot > > >> fully > > >>>>> involve the optimizer to decide the plan if a cache/materialize is > > >>>>> explicitly used, because weather to reuse the cache data or > > re-execute > > >> the > > >>>>> query from source data may lead to different results. (But I guess > > >>>>> optimizer can still help in some cases ---- as long as it does not > > >>>>> re-execute from the varied source, we should be safe). > > >>>>> > > >>>>> Regards, > > >>>>> Shaoxuan > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski < > > >> pi...@data-artisans.com> > > >>>>> wrote: > > >>>>> > > >>>>>> Hi Shaoxuan, > > >>>>>> > > >>>>>> Re 2: > > >>>>>> > > >>>>>>> Table t3 = methodThatAppliesOperators(t1) // t1 is modified to-> > > t1’ > > >>>>>> > > >>>>>> What do you mean that “ t1 is modified to-> t1’ ” ? That > > >>>>>> `methodThatAppliesOperators()` method has changed it’s plan? > > >>>>>> > > >>>>>> I was thinking more about something like this: > > >>>>>> > > >>>>>> Table source = … // some source that scans files from a directory > > >>>>>> “/foo/bar/“ > > >>>>>> Table t1 = source.groupBy(…).select(…).where(…) ….; > > >>>>>> Table t2 = t1.materialize() // (or `cache()`) > > >>>>>> > > >>>>>> t2.count() // initialise cache (if it’s lazily initialised) > > >>>>>> > > >>>>>> int a1 = t1.count() > > >>>>>> int b1 = t2.count() > > >>>>>> > > >>>>>> // something in the background (or we trigger it) writes new files > > to > > >>>>>> /foo/bar > > >>>>>> > > >>>>>> int a2 = t1.count() > > >>>>>> int b2 = t2.count() > > >>>>>> > > >>>>>> t2.refresh() // possible future extension, not to be implemented > in > > >> the > > >>>>>> initial version > > >>>>>> > > >>>>>> int a3 = t1.count() > > >>>>>> int b3 = t2.count() > > >>>>>> > > >>>>>> t2.drop() // another possible future extension, manual “cache” > > >> dropping > > >>>>>> > > >>>>>> assertTrue(a1 == b1) // same results, but b1 comes from the > “cache" > > >>>>>> assertTrue(b1 == b2) // both values come from the same cache > > >>>>>> assertTrue(a2 > b2) // b2 comes from cache, a2 re-executed full > > table > > >>>>> scan > > >>>>>> and has more data > > >>>>>> assertTrue(b3 > b2) // b3 comes from refreshed cache > > >>>>>> assertTrue(b3 == a2 == a3) > > >>>>>> > > >>>>>> Piotrek > > >>>>>> > > >>>>>>> On 30 Nov 2018, at 10:22, Jark Wu <imj...@gmail.com> wrote: > > >>>>>>> > > >>>>>>> Hi, > > >>>>>>> > > >>>>>>> It is an very interesting and useful design! > > >>>>>>> > > >>>>>>> Here I want to share some of my thoughts: > > >>>>>>> > > >>>>>>> 1. Agree with that cache() method should return some Table to > avoid > > >>>>> some > > >>>>>>> unexpected problems because of the mutable object. > > >>>>>>> All the existing methods of Table are returning a new Table > > instance. > > >>>>>>> > > >>>>>>> 2. I think materialize() would be more consistent with SQL, this > > >> makes > > >>>>> it > > >>>>>>> possible to support the same feature for SQL (materialize view) > and > > >>>>> keep > > >>>>>>> the same API for users in the future. > > >>>>>>> But I'm also fine if we choose cache(). > > >>>>>>> > > >>>>>>> 3. In the proposal, a TableService (or FlinkService?) is used to > > >> cache > > >>>>>> the > > >>>>>>> result of the (intermediate) table. > > >>>>>>> But the name of TableService may be a bit general which is not > > quite > > >>>>>>> understanding correctly in the first glance (a metastore for > > >> tables?). > > >>>>>>> Maybe a more specific name would be better, such as > > TableCacheSerive > > >>>>> or > > >>>>>>> TableMaterializeSerivce or something else. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Jark > > >>>>>>> > > >>>>>>> > > >>>>>>> On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <fhue...@gmail.com> > > >> wrote: > > >>>>>>> > > >>>>>>>> Hi, > > >>>>>>>> > > >>>>>>>> Thanks for the clarification Becket! > > >>>>>>>> > > >>>>>>>> I have a few thoughts to share / questions: > > >>>>>>>> > > >>>>>>>> 1) I'd like to know how you plan to implement the feature on a > > plan > > >> / > > >>>>>>>> planner level. > > >>>>>>>> > > >>>>>>>> I would imaging the following to happen when Table.cache() is > > >> called: > > >>>>>>>> > > >>>>>>>> 1) immediately optimize the Table and internally convert it > into a > > >>>>>>>> DataSet/DataStream. This is necessary, to avoid that operators > of > > >>>>> later > > >>>>>>>> queries on top of the Table are pushed down. > > >>>>>>>> 2) register the DataSet/DataStream as a > DataSet/DataStream-backed > > >>>>> Table > > >>>>>> X > > >>>>>>>> 3) add a sink to the DataSet/DataStream. This is the > > materialization > > >>>>> of > > >>>>>> the > > >>>>>>>> Table X > > >>>>>>>> > > >>>>>>>> Based on your proposal the following would happen: > > >>>>>>>> > > >>>>>>>> Table t1 = .... > > >>>>>>>> t1.cache(); // cache() returns void. The logical plan of t1 is > > >>>>> replaced > > >>>>>> by > > >>>>>>>> a scan of X. There is also a reference to the materialization of > > X. > > >>>>>>>> > > >>>>>>>> t1.count(); // this executes the program, including the > > >>>>>> DataSet/DataStream > > >>>>>>>> that backs X and the sink that writes the materialization of X > > >>>>>>>> t1.count(); // this executes the program, but reads X from the > > >>>>>>>> materialization. > > >>>>>>>> > > >>>>>>>> My question is, how do you determine when whether the scan of t1 > > >>>>> should > > >>>>>> go > > >>>>>>>> against the DataSet/DataStream program and when against the > > >>>>>>>> materialization? > > >>>>>>>> AFAIK, there is no hook that will tell you that a part of the > > >> program > > >>>>>> was > > >>>>>>>> executed. Flipping a switch during optimization or plan > generation > > >> is > > >>>>>> not > > >>>>>>>> sufficient as there is no guarantee that the plan is also > > executed. > > >>>>>>>> > > >>>>>>>> Overall, this behavior is somewhat similar to what I proposed in > > >>>>>>>> FLINK-8950, which does not include persisting the table, but > just > > >>>>>>>> optimizing and reregistering it as DataSet/DataStream scan. > > >>>>>>>> > > >>>>>>>> 2) I think Piotr has a point about the implicit behavior and > side > > >>>>>> effects > > >>>>>>>> of the cache() method if it does not return anything. > > >>>>>>>> Consider the following example: > > >>>>>>>> > > >>>>>>>> Table t1 = ??? > > >>>>>>>> Table t2 = methodThatAppliesOperators(t1); > > >>>>>>>> Table t3 = methodThatAppliesOtherOperators(t1); > > >>>>>>>> > > >>>>>>>> In this case, the behavior/performance of the plan that results > > from > > >>>>> the > > >>>>>>>> second method call depends on whether t1 was modified by the > first > > >>>>>> method > > >>>>>>>> or not. > > >>>>>>>> This is the classic issue of mutable vs. immutable objects. > > >>>>>>>> Also, as Piotr pointed out, it might also be good to have the > > >> original > > >>>>>> plan > > >>>>>>>> of t1, because in some cases it is possible to push filters down > > >> such > > >>>>>> that > > >>>>>>>> evaluating the query from scratch might be more efficient than > > >>>>> accessing > > >>>>>>>> the cache. > > >>>>>>>> Moreover, a CachedTable could extend Table() and offer a method > > >>>>>> refresh(). > > >>>>>>>> This sounds quite useful in an interactive session mode. > > >>>>>>>> > > >>>>>>>> 3) Regarding the name, I can see both arguments. IMO, > > materialize() > > >>>>>> seems > > >>>>>>>> to be more future proof. > > >>>>>>>> > > >>>>>>>> Best, Fabian > > >>>>>>>> > > >>>>>>>> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan Wang < > > >>>>>>>> wshaox...@gmail.com>: > > >>>>>>>> > > >>>>>>>>> Hi Piotr, > > >>>>>>>>> > > >>>>>>>>> Thanks for sharing your ideas on the method naming. We will > think > > >>>>> about > > >>>>>>>>> your suggestions. But I don't understand why we need to change > > the > > >>>>>> return > > >>>>>>>>> type of cache(). > > >>>>>>>>> > > >>>>>>>>> Cache() is a physical operation, it does not change the logic > of > > >>>>>>>>> the `Table`. On the tableAPI layer, we should not introduce a > new > > >>>>> table > > >>>>>>>>> type unless the logic of table has been changed. If we > introduce > > a > > >>>>> new > > >>>>>>>>> table type `CachedTable`, we need create the same set of > methods > > of > > >>>>>>>> `Table` > > >>>>>>>>> for it. I don't think it is worth doing this. Or can you please > > >>>>>> elaborate > > >>>>>>>>> more on what could be the "implicit behaviours/side effects" > you > > >> are > > >>>>>>>>> thinking about? > > >>>>>>>>> > > >>>>>>>>> Regards, > > >>>>>>>>> Shaoxuan > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski < > > >>>>>> pi...@data-artisans.com> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Hi Becket, > > >>>>>>>>>> > > >>>>>>>>>> Thanks for the response. > > >>>>>>>>>> > > >>>>>>>>>> 1. I wasn’t saying that materialised view must be mutable or > > not. > > >>>>> The > > >>>>>>>>> same > > >>>>>>>>>> thing applies to caches as well. To the contrary, I would > expect > > >>>>> more > > >>>>>>>>>> consistency and updates from something that is called “cache” > vs > > >>>>>>>>> something > > >>>>>>>>>> that’s a “materialised view”. In other words, IMO most caches > do > > >> not > > >>>>>>>>> serve > > >>>>>>>>>> you invalid/outdated data and they handle updates on their > own. > > >>>>>>>>>> > > >>>>>>>>>> 2. I don’t think that having in the future two very similar > > >> concepts > > >>>>>> of > > >>>>>>>>>> `materialized` view and `cache` is a good idea. It would be > > >>>>> confusing > > >>>>>>>> for > > >>>>>>>>>> the users. I think it could be handled by > variations/overloading > > >> of > > >>>>>>>>>> materialised view concept. We could start with: > > >>>>>>>>>> > > >>>>>>>>>> `MaterializedTable materialize()` - immutable, session life > > scope > > >>>>>>>>>> (basically the same semantic as you are proposing > > >>>>>>>>>> > > >>>>>>>>>> And then in the future (if ever) build on top of that/expand > it > > >>>>> with: > > >>>>>>>>>> > > >>>>>>>>>> `MaterializedTable materialize(refreshTime=…)` or > > >> `MaterializedTable > > >>>>>>>>>> materialize(refreshHook=…)` > > >>>>>>>>>> > > >>>>>>>>>> Or with cross session support: > > >>>>>>>>>> > > >>>>>>>>>> `MaterializedTable materializeInto(connector=…)` or > > >>>>> `MaterializedTable > > >>>>>>>>>> materializeInto(tableFactory=…)` > > >>>>>>>>>> > > >>>>>>>>>> I’m not saying that we should implement cross > session/refreshing > > >> now > > >>>>>> or > > >>>>>>>>>> even in the near future. I’m just arguing that naming current > > >>>>>> immutable > > >>>>>>>>>> session life scope method `materialize()` is more future proof > > and > > >>>>>> more > > >>>>>>>>>> consistent with SQL (on which after all table-api is heavily > > >> basing > > >>>>>>>> on). > > >>>>>>>>>> > > >>>>>>>>>> 3. Even if we agree on naming it `cache()`, I would still > insist > > >> on > > >>>>>>>>>> `cache()` returning `CachedTable` handle to avoid implicit > > >>>>>>>>> behaviours/side > > >>>>>>>>>> effects and to give both us & users more flexibility. > > >>>>>>>>>> > > >>>>>>>>>> Piotrek > > >>>>>>>>>> > > >>>>>>>>>>> On 29 Nov 2018, at 06:20, Becket Qin <becket....@gmail.com> > > >> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Just to add a little bit, the materialized view is probably > > more > > >>>>>>>>> similar > > >>>>>>>>>> to > > >>>>>>>>>>> the persistent() brought up earlier in the thread. So it is > > >> usually > > >>>>>>>>> cross > > >>>>>>>>>>> session and could be used in a larger scope. For example, a > > >>>>>>>>> materialized > > >>>>>>>>>>> view created by user A may be visible to user B. It is > probably > > >>>>>>>>> something > > >>>>>>>>>>> we want to have in the future. I'll put it in the future work > > >>>>>>>> section. > > >>>>>>>>>>> > > >>>>>>>>>>> Thanks, > > >>>>>>>>>>> > > >>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>> > > >>>>>>>>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin < > > becket....@gmail.com > > >>> > > >>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Hi Piotrek, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks for the explanation. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Right now we are mostly thinking of the cached table as > > >>>>> immutable. I > > >>>>>>>>> can > > >>>>>>>>>>>> see the Materialized view would be useful in the future. > That > > >>>>> said, > > >>>>>>>> I > > >>>>>>>>>> think > > >>>>>>>>>>>> a simple cache mechanism is probably still needed. So to me, > > >>>>> cache() > > >>>>>>>>> and > > >>>>>>>>>>>> materialize() should be two separate method as they address > > >>>>>>>> different > > >>>>>>>>>>>> needs. Materialize() is a higher level concept usually > > implying > > >>>>>>>>>> periodical > > >>>>>>>>>>>> update, while cache() has much simpler semantic. For > example, > > >> one > > >>>>>>>> may > > >>>>>>>>>>>> create a materialized view and use cache() method in the > > >>>>>>>> materialized > > >>>>>>>>>> view > > >>>>>>>>>>>> creation logic. So that during the materialized view update, > > >> they > > >>>>> do > > >>>>>>>>> not > > >>>>>>>>>>>> need to worry about the case that the cached table is also > > >>>>> changed. > > >>>>>>>>>> Maybe > > >>>>>>>>>>>> under the hood, materialized() and cache() could share some > > >>>>>>>> mechanism, > > >>>>>>>>>> but > > >>>>>>>>>>>> I think a simple cache() method would be handy in a lot of > > >> cases. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks, > > >>>>>>>>>>>> > > >>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski < > > >>>>>>>>> pi...@data-artisans.com > > >>>>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Hi Becket, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Is there any extra thing user can do on a > MaterializedTable > > >> that > > >>>>>>>>> they > > >>>>>>>>>>>>> cannot do on a Table? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Maybe not in the initial implementation, but various DBs > > offer > > >>>>>>>>>> different > > >>>>>>>>>>>>> ways to “refresh” the materialised view. Hooks, triggers, > > >> timers, > > >>>>>>>>>> manually > > >>>>>>>>>>>>> etc. Having `MaterializedTable` would help us to handle > that > > in > > >>>>> the > > >>>>>>>>>> future. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> After users call *table.cache(), *users can just use that > > >> table > > >>>>>>>> and > > >>>>>>>>> do > > >>>>>>>>>>>>> anything that is supported on a Table, including SQL. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> This is some implicit behaviour with side effects. Imagine > if > > >>>>> user > > >>>>>>>>> has > > >>>>>>>>>> a > > >>>>>>>>>>>>> long and complicated program, that touches table `b` > multiple > > >>>>>>>> times, > > >>>>>>>>>> maybe > > >>>>>>>>>>>>> scattered around different methods. If he modifies his > > program > > >> by > > >>>>>>>>>> inserting > > >>>>>>>>>>>>> in one place > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> b.cache() > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> This implicitly alters the semantic and behaviour of his > code > > >> all > > >>>>>>>>> over > > >>>>>>>>>>>>> the place, maybe in a ways that might cause problems. For > > >> example > > >>>>>>>>> what > > >>>>>>>>>> if > > >>>>>>>>>>>>> underlying data is changing? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Having invisible side effects is also not very clean, for > > >> example > > >>>>>>>>> think > > >>>>>>>>>>>>> about something like this (but more complicated): > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Table b = ...; > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> If (some_condition) { > > >>>>>>>>>>>>> processTable1(b) > > >>>>>>>>>>>>> } > > >>>>>>>>>>>>> else { > > >>>>>>>>>>>>> processTable2(b) > > >>>>>>>>>>>>> } > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> // do more stuff with b > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> And user adds `b.cache()` call to only one of the > > >> `processTable1` > > >>>>>>>> or > > >>>>>>>>>>>>> `processTable2` methods. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On the other hand > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Table materialisedB = b.materialize() > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Avoids (at least some of) the side effect issues and forces > > >> user > > >>>>> to > > >>>>>>>>>>>>> explicitly use `materialisedB` where it’s appropriate and > > >> forces > > >>>>>>>> user > > >>>>>>>>>> to > > >>>>>>>>>>>>> think what does it actually mean. And if something doesn’t > > work > > >>>>> in > > >>>>>>>>> the > > >>>>>>>>>> end > > >>>>>>>>>>>>> for the user, he will know what has he changed instead of > > >> blaming > > >>>>>>>>>> Flink for > > >>>>>>>>>>>>> some “magic” underneath. In the above example, after > > >>>>> materialising > > >>>>>>>> b > > >>>>>>>>> in > > >>>>>>>>>>>>> only one of the methods, he should/would realise about the > > >> issue > > >>>>>>>> when > > >>>>>>>>>>>>> handling the return value `MaterializedTable` of that > method. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I guess it comes down to personal preferences if you like > > >> things > > >>>>> to > > >>>>>>>>> be > > >>>>>>>>>>>>> implicit or not. The more power is the user, probably the > > more > > >>>>>>>> likely > > >>>>>>>>>> he is > > >>>>>>>>>>>>> to like/understand implicit behaviour. And we as Table API > > >>>>>>>> designers > > >>>>>>>>>> are > > >>>>>>>>>>>>> the most power users out there, so I would proceed with > > caution > > >>>>> (so > > >>>>>>>>>> that we > > >>>>>>>>>>>>> do not end up in the crazy perl realm with it’s lovely > > implicit > > >>>>>>>>> method > > >>>>>>>>>>>>> arguments ;) < > https://stackoverflow.com/a/14922656/8149051 > > >) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Table API to also support non-relational processing cases, > > >>>>> cache() > > >>>>>>>>>>>>> might be slightly better. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I think even such extended Table API could benefit from > > >> sticking > > >>>>>>>>>> to/being > > >>>>>>>>>>>>> consistent with SQL where both SQL and Table API are > > basically > > >>>>> the > > >>>>>>>>>> same. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> One more thing. `MaterializedTable materialize()` could be > > more > > >>>>>>>>>>>>> powerful/flexible allowing the user to operate both on > > >>>>> materialised > > >>>>>>>>>> and not > > >>>>>>>>>>>>> materialised view at the same time for whatever reasons > > >>>>> (underlying > > >>>>>>>>>> data > > >>>>>>>>>>>>> changing/better optimisation opportunities after pushing > down > > >>>>> more > > >>>>>>>>>> filters > > >>>>>>>>>>>>> etc). For example: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Table b = …; > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> MaterlizedTable mb = b.materialize(); > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Val min = mb.min(); > > >>>>>>>>>>>>> Val max = mb.max(); > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Val user42 = b.filter(‘userId = 42); > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Could be more efficient compared to `b.cache()` if > > >>>>> `filter(‘userId > > >>>>>>>> = > > >>>>>>>>>>>>> 42);` allows for much more aggressive optimisations. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Piotrek > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske < > fhue...@gmail.com> > > >>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I'm not suggesting to add support for Ignite. This was > just > > an > > >>>>>>>>>> example. > > >>>>>>>>>>>>>> Plasma and Arrow sound interesting, too. > > >>>>>>>>>>>>>> For the sake of this proposal, it would be up to the user > to > > >>>>>>>>>> implement a > > >>>>>>>>>>>>>> TableFactory and corresponding TableSource / TableSink > > classes > > >>>>> to > > >>>>>>>>>>>>> persist > > >>>>>>>>>>>>>> and read the data. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb Flavio > > Pompermaier > > >> < > > >>>>>>>>>>>>>> pomperma...@okkam.it>: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> What about to add also Apache Plasma + Arrow as an > > >> alternative > > >>>>> to > > >>>>>>>>>>>>> Apache > > >>>>>>>>>>>>>>> Ignite? > > >>>>>>>>>>>>>>> [1] > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>> > > >>>>> > > >> > https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/ > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske < > > >>>>>>>> fhue...@gmail.com> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks for the proposal! > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> To summarize, you propose a new method Table.cache(): > > Table > > >>>>> that > > >>>>>>>>>> will > > >>>>>>>>>>>>>>>> trigger a job and write the result into some temporary > > >> storage > > >>>>>>>> as > > >>>>>>>>>>>>> defined > > >>>>>>>>>>>>>>>> by a TableFactory. > > >>>>>>>>>>>>>>>> The cache() call blocks while the job is running and > > >>>>> eventually > > >>>>>>>>>>>>> returns a > > >>>>>>>>>>>>>>>> Table object that represents a scan of the temporary > > table. > > >>>>>>>>>>>>>>>> When the "session" is closed (closing to be defined?), > the > > >>>>>>>>> temporary > > >>>>>>>>>>>>>>> tables > > >>>>>>>>>>>>>>>> are all dropped. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I think this behavior makes sense and is a good first > step > > >>>>>>>> towards > > >>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>> interactive workloads. > > >>>>>>>>>>>>>>>> However, its performance suffers from writing to and > > reading > > >>>>>>>> from > > >>>>>>>>>>>>>>> external > > >>>>>>>>>>>>>>>> systems. > > >>>>>>>>>>>>>>>> I think this is OK for now. Changes that would > > significantly > > >>>>>>>>> improve > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>> situation (i.e., pinning data in-memory across jobs) > would > > >>>>> have > > >>>>>>>>>> large > > >>>>>>>>>>>>>>>> impacts on many components of Flink. > > >>>>>>>>>>>>>>>> Users could use in-memory filesystems or storage grids > > >> (Apache > > >>>>>>>>>>>>> Ignite) to > > >>>>>>>>>>>>>>>> mitigate some of the performance effects. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Best, Fabian > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb Becket Qin < > > >>>>>>>>>>>>>>>> becket....@gmail.com > > >>>>>>>>>>>>>>>>> : > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks for the explanation, Piotrek. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Is there any extra thing user can do on a > > MaterializedTable > > >>>>>>>> that > > >>>>>>>>>> they > > >>>>>>>>>>>>>>>>> cannot do on a Table? After users call *table.cache(), > > >> *users > > >>>>>>>> can > > >>>>>>>>>>>>> just > > >>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>> that table and do anything that is supported on a > Table, > > >>>>>>>>> including > > >>>>>>>>>>>>> SQL. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Naming wise, either cache() or materialize() sounds > fine > > to > > >>>>> me. > > >>>>>>>>>>>>> cache() > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>> a bit more general than materialize(). Given that we > are > > >>>>>>>>> enhancing > > >>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> Table API to also support non-relational processing > > cases, > > >>>>>>>>> cache() > > >>>>>>>>>>>>>>> might > > >>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>> slightly better. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr Nowojski < > > >>>>>>>>>>>>>>> pi...@data-artisans.com > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Hi Becket, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Ops, sorry I didn’t notice that you intend to reuse > > >> existing > > >>>>>>>>>>>>>>>>>> `TableFactory`. I don’t know why, but I assumed that > you > > >>>>> want > > >>>>>>>> to > > >>>>>>>>>>>>>>>> provide > > >>>>>>>>>>>>>>>>> an > > >>>>>>>>>>>>>>>>>> alternate way of writing the data. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Now that I hopefully understand the proposal, maybe we > > >> could > > >>>>>>>>>> rename > > >>>>>>>>>>>>>>>>>> `cache()` to > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> void materialize() > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> or going step further > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> MaterializedTable materialize() > > >>>>>>>>>>>>>>>>>> MaterializedTable createMaterializedView() > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> ? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> The second option with returning a handle I think is > > more > > >>>>>>>>> flexible > > >>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> could provide features such as “refresh”/“delete” or > > >>>>> generally > > >>>>>>>>>>>>>>> speaking > > >>>>>>>>>>>>>>>>>> manage the the view. In the future we could also think > > >> about > > >>>>>>>>>> adding > > >>>>>>>>>>>>>>>> hooks > > >>>>>>>>>>>>>>>>>> to automatically refresh view etc. It is also more > > >> explicit > > >>>>> - > > >>>>>>>>>>>>>>>>>> materialization returning a new table handle will not > > have > > >>>>> the > > >>>>>>>>>> same > > >>>>>>>>>>>>>>>>>> implicit side effects as adding a simple line of code > > like > > >>>>>>>>>>>>>>> `b.cache()` > > >>>>>>>>>>>>>>>>>> would have. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> It would also be more SQL like, making it more > intuitive > > >> for > > >>>>>>>>> users > > >>>>>>>>>>>>>>>>> already > > >>>>>>>>>>>>>>>>>> familiar with the SQL. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Piotrek > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin < > > >> becket....@gmail.com > > >>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hi Piotrek, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> For the cache() method itself, yes, it is equivalent > to > > >>>>>>>>> creating > > >>>>>>>>>> a > > >>>>>>>>>>>>>>>>>> BUILT-IN > > >>>>>>>>>>>>>>>>>>> materialized view with a lifecycle. That > functionality > > is > > >>>>>>>>> missing > > >>>>>>>>>>>>>>>>> today, > > >>>>>>>>>>>>>>>>>>> though. Not sure if I understand your question. Do > you > > >> mean > > >>>>>>>> we > > >>>>>>>>>>>>>>>> already > > >>>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>>> the functionality and just need a syntax sugar? > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> What's more interesting in the proposal is do we want > > to > > >>>>> stop > > >>>>>>>>> at > > >>>>>>>>>>>>>>>>> creating > > >>>>>>>>>>>>>>>>>>> the materialized view? Or do we want to extend that > in > > >> the > > >>>>>>>>> future > > >>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>> useful unified data store distributed with Flink? And > > do > > >> we > > >>>>>>>>> want > > >>>>>>>>>> to > > >>>>>>>>>>>>>>>>> have > > >>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>> mechanism allow more flexible user job pattern with > > their > > >>>>> own > > >>>>>>>>>> user > > >>>>>>>>>>>>>>>>>> defined > > >>>>>>>>>>>>>>>>>>> services. These considerations are much more > > >> architectural. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski < > > >>>>>>>>>>>>>>>>> pi...@data-artisans.com> > > >>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Interesting idea. I’m trying to understand the > > problem. > > >>>>>>>> Isn’t > > >>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> `cache()` call an equivalent of writing data to a > sink > > >> and > > >>>>>>>>> later > > >>>>>>>>>>>>>>>>> reading > > >>>>>>>>>>>>>>>>>>>> from it? Where this sink has a limited live > scope/live > > >>>>> time? > > >>>>>>>>> And > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> sink > > >>>>>>>>>>>>>>>>>>>> could be implemented as in memory or a file sink? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> If so, what’s the problem with creating a > materialised > > >>>>> view > > >>>>>>>>>> from a > > >>>>>>>>>>>>>>>>> table > > >>>>>>>>>>>>>>>>>>>> “b” (from your document’s example) and reusing this > > >>>>>>>>> materialised > > >>>>>>>>>>>>>>>> view > > >>>>>>>>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to clean up > > >>>>>>>>> materialised > > >>>>>>>>>>>>>>>> views > > >>>>>>>>>>>>>>>>>> (for > > >>>>>>>>>>>>>>>>>>>> example when current session finishes)? Maybe we > need > > >> some > > >>>>>>>>>>>>>>> syntactic > > >>>>>>>>>>>>>>>>>> sugar > > >>>>>>>>>>>>>>>>>>>> on top of it? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Piotrek > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin < > > >>>>> becket....@gmail.com > > >>>>>>>>> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Yes, I think it makes sense to have a persist() > with > > >>>>>>>>>>>>>>>>> lifecycle/defined > > >>>>>>>>>>>>>>>>>>>>> scope. I just added a section in the future work > for > > >>>>> this. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun < > > >>>>>>>>>>>>>>>>> sunjincheng...@gmail.com > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Thank you for the explanation about the name of > > >>>>>>>> `cache()`, I > > >>>>>>>>>>>>>>>>>> understand > > >>>>>>>>>>>>>>>>>>>> why > > >>>>>>>>>>>>>>>>>>>>>> you designed this way! > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Another idea is whether we can specify a lifecycle > > for > > >>>>>>>> data > > >>>>>>>>>>>>>>>>>> persistence? > > >>>>>>>>>>>>>>>>>>>>>> For example, persist (LifeCycle.SESSION), so that > > the > > >>>>> user > > >>>>>>>>> is > > >>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>> worried > > >>>>>>>>>>>>>>>>>>>>>> about data loss, and will clearly specify the time > > >> range > > >>>>>>>> for > > >>>>>>>>>>>>>>>> keeping > > >>>>>>>>>>>>>>>>>>>> time. > > >>>>>>>>>>>>>>>>>>>>>> At the same time, if we want to expand, we can > also > > >>>>> share > > >>>>>>>>> in a > > >>>>>>>>>>>>>>>>> certain > > >>>>>>>>>>>>>>>>>>>>>> group of session, for example: > > >>>>>>>>> LifeCycle.SESSION_GROUP(...), I > > >>>>>>>>>>>>>>> am > > >>>>>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>>>>>> sure, > > >>>>>>>>>>>>>>>>>>>>>> just an immature suggestion, for reference only! > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Bests, > > >>>>>>>>>>>>>>>>>>>>>> Jincheng > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年11月23日周五 > > >>>>>>>> 下午1:33写道: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Re: Jincheng, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding cache() v.s. > > >>>>>>>> persist(), > > >>>>>>>>>>>>>>>>>> personally I > > >>>>>>>>>>>>>>>>>>>>>>> find cache() to be more accurately describing the > > >>>>>>>> behavior, > > >>>>>>>>>>>>>>> i.e. > > >>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> Table > > >>>>>>>>>>>>>>>>>>>>>>> is cached for the session, but will be deleted > > after > > >>>>> the > > >>>>>>>>>>>>>>> session > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>>>>> closed. > > >>>>>>>>>>>>>>>>>>>>>>> persist() seems a little misleading as people > might > > >>>>> think > > >>>>>>>>> the > > >>>>>>>>>>>>>>>> table > > >>>>>>>>>>>>>>>>>>>> will > > >>>>>>>>>>>>>>>>>>>>>>> still be there even after the session is gone. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Great point about mixing the batch and stream > > >>>>> processing > > >>>>>>>> in > > >>>>>>>>>> the > > >>>>>>>>>>>>>>>>> same > > >>>>>>>>>>>>>>>>>>>> job. > > >>>>>>>>>>>>>>>>>>>>>>> We should absolutely move towards that goal. I > > >> imagine > > >>>>>>>> that > > >>>>>>>>>>>>>>> would > > >>>>>>>>>>>>>>>>> be > > >>>>>>>>>>>>>>>>>> a > > >>>>>>>>>>>>>>>>>>>>>> huge > > >>>>>>>>>>>>>>>>>>>>>>> change across the board, including sources, > > operators > > >>>>> and > > >>>>>>>>>>>>>>>>>>>> optimizations, > > >>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>> name some. Likely we will need several separate > > >>>>> in-depth > > >>>>>>>>>>>>>>>>> discussions. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui < > > >>>>>>>>>>>>>>> xingc...@gmail.com> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or access > domain > > >> are > > >>>>>>>> both > > >>>>>>>>>>>>>>>>>> orthogonal > > >>>>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> the cache problem. Essentially, this may be the > > >> first > > >>>>>>>> time > > >>>>>>>>>> we > > >>>>>>>>>>>>>>>> plan > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>>> introduce another storage mechanism other than > the > > >>>>>>>> state. > > >>>>>>>>>>>>>>> Maybe > > >>>>>>>>>>>>>>>>> it’s > > >>>>>>>>>>>>>>>>>>>>>>> better > > >>>>>>>>>>>>>>>>>>>>>>>> to first draw a big picture and then concentrate > > on > > >> a > > >>>>>>>>>> specific > > >>>>>>>>>>>>>>>>> part? > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more concerned with > > the > > >>>>>>>>>> underlying > > >>>>>>>>>>>>>>>>>>>> service. > > >>>>>>>>>>>>>>>>>>>>>>>> This seems to be quite a major change to the > > >> existing > > >>>>>>>>>>>>>>> codebase. > > >>>>>>>>>>>>>>>> As > > >>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>>>>>> claimed, the service should be extendible to > > support > > >>>>>>>> other > > >>>>>>>>>>>>>>>>>> components > > >>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>> we’d better discussed it in another thread. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the more > > >> interactive > > >>>>>>>>> Table > > >>>>>>>>>>>>>>>> API, > > >>>>>>>>>>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>> case > > >>>>>>>>>>>>>>>>>>>>>>>> of a general and flexible enough service > > mechanism. > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>> Xingcan > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang < > > >>>>>>>>>>>>>>>> xiaow...@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Relying on a callback for the temp table for > > clean > > >> up > > >>>>>>>> is > > >>>>>>>>>> not > > >>>>>>>>>>>>>>>> very > > >>>>>>>>>>>>>>>>>>>>>>>> reliable. > > >>>>>>>>>>>>>>>>>>>>>>>>> There is no guarantee that it will be executed > > >>>>>>>>>> successfully. > > >>>>>>>>>>>>>>> We > > >>>>>>>>>>>>>>>>> may > > >>>>>>>>>>>>>>>>>>>>>>> risk > > >>>>>>>>>>>>>>>>>>>>>>>>> leaks when that happens. I think that it's > safer > > to > > >>>>>>>> have > > >>>>>>>>> an > > >>>>>>>>>>>>>>>>>>>>>> association > > >>>>>>>>>>>>>>>>>>>>>>>>> between temp table and session id. So we can > > always > > >>>>>>>> clean > > >>>>>>>>>> up > > >>>>>>>>>>>>>>>> temp > > >>>>>>>>>>>>>>>>>>>>>>> tables > > >>>>>>>>>>>>>>>>>>>>>>>>> which are no longer associated with any active > > >>>>>>>> sessions. > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> Regards, > > >>>>>>>>>>>>>>>>>>>>>>>>> Xiaowei > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun < > > >>>>>>>>>>>>>>>>>>>>>>> sunjincheng...@gmail.com> > > >>>>>>>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan, > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for initiating this great proposal! > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Interactive Programming is very useful and > user > > >>>>>>>> friendly > > >>>>>>>>>> in > > >>>>>>>>>>>>>>>> case > > >>>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>>>>>>>>>>>>> examples. > > >>>>>>>>>>>>>>>>>>>>>>>>>> Moreover, especially when a business has to be > > >>>>>>>> executed > > >>>>>>>>> in > > >>>>>>>>>>>>>>>>> several > > >>>>>>>>>>>>>>>>>>>>>>>> stages > > >>>>>>>>>>>>>>>>>>>>>>>>>> with dependencies,such as the pipeline of > Flink > > >> ML, > > >>>>> in > > >>>>>>>>>> order > > >>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>>> utilize > > >>>>>>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>> intermediate calculation results we have to > > >> submit a > > >>>>>>>> job > > >>>>>>>>>> by > > >>>>>>>>>>>>>>>>>>>>>>>> env.execute(). > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> About the `cache()` , I think is better to > > named > > >>>>>>>>>>>>>>> `persist()`, > > >>>>>>>>>>>>>>>>> And > > >>>>>>>>>>>>>>>>>>>>>> The > > >>>>>>>>>>>>>>>>>>>>>>>>>> Flink framework determines whether we > internally > > >>>>> cache > > >>>>>>>>> in > > >>>>>>>>>>>>>>>> memory > > >>>>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>>>>>>> persist > > >>>>>>>>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save the data into > > >> state > > >>>>>>>>>> backend > > >>>>>>>>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or RocksDBStateBackend > etc.) > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in the future, > > >>>>> support > > >>>>>>>>> for > > >>>>>>>>>>>>>>>>>> streaming > > >>>>>>>>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>>>>>>>> batch mode switching in the same job will also > > >>>>> benefit > > >>>>>>>>> in > > >>>>>>>>>>>>>>>>>>>>>> "Interactive > > >>>>>>>>>>>>>>>>>>>>>>>>>> Programming", I am looking forward to your > > JIRAs > > >>>>> and > > >>>>>>>>>> FLIP! > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>>>>>>>>> Jincheng > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> > > 于2018年11月20日周二 > > >>>>>>>>>> 下午9:56写道: > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> As a few recent email threads have pointed > out, > > >> it > > >>>>>>>> is a > > >>>>>>>>>>>>>>>>> promising > > >>>>>>>>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table API in > > various > > >>>>>>>>>> aspects, > > >>>>>>>>>>>>>>>>>>>>>> including > > >>>>>>>>>>>>>>>>>>>>>>>>>>> functionality and ease of use among others. > One > > >> of > > >>>>>>>> the > > >>>>>>>>>>>>>>>>> scenarios > > >>>>>>>>>>>>>>>>>>>>>>> where > > >>>>>>>>>>>>>>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>>>>>>>>>>>> feel Flink could improve is interactive > > >>>>> programming. > > >>>>>>>> To > > >>>>>>>>>>>>>>>> explain > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>> issues > > >>>>>>>>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on the > solution, > > we > > >>>>> put > > >>>>>>>>>>>>>>>> together > > >>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>>>>>>> following document with our proposal. > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >> > > > https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Feedback and comments are very welcome! > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > > >>>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>>> > > >>>>> > > >>> > > >>> > > >> > > >> > > >> > > > > >