Thanks Piotrek, You provided a very good example, it explains all the confusions I have. It is clear that there is something we have not considered in the initial proposal. We intend to force the user to reuse the cached/materialized table, if its cache() method is executed. We did not expect that user may want to re-executed the plan from the source table. Let me re-think about it and get back to you later.
In the meanwhile, this example/observation also infers that we cannot fully involve the optimizer to decide the plan if a cache/materialize is explicitly used, because weather to reuse the cache data or re-execute the query from source data may lead to different results. (But I guess optimizer can still help in some cases ---- as long as it does not re-execute from the varied source, we should be safe). Regards, Shaoxuan On Fri, Nov 30, 2018 at 9:13 PM Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi Shaoxuan, > > Re 2: > > > Table t3 = methodThatAppliesOperators(t1) // t1 is modified to-> t1’ > > What do you mean that “ t1 is modified to-> t1’ ” ? That > `methodThatAppliesOperators()` method has changed it’s plan? > > I was thinking more about something like this: > > Table source = … // some source that scans files from a directory > “/foo/bar/“ > Table t1 = source.groupBy(…).select(…).where(…) ….; > Table t2 = t1.materialize() // (or `cache()`) > > t2.count() // initialise cache (if it’s lazily initialised) > > int a1 = t1.count() > int b1 = t2.count() > > // something in the background (or we trigger it) writes new files to > /foo/bar > > int a2 = t1.count() > int b2 = t2.count() > > t2.refresh() // possible future extension, not to be implemented in the > initial version > > int a3 = t1.count() > int b3 = t2.count() > > t2.drop() // another possible future extension, manual “cache” dropping > > assertTrue(a1 == b1) // same results, but b1 comes from the “cache" > assertTrue(b1 == b2) // both values come from the same cache > assertTrue(a2 > b2) // b2 comes from cache, a2 re-executed full table scan > and has more data > assertTrue(b3 > b2) // b3 comes from refreshed cache > assertTrue(b3 == a2 == a3) > > Piotrek > > > On 30 Nov 2018, at 10:22, Jark Wu <imj...@gmail.com> wrote: > > > > Hi, > > > > It is an very interesting and useful design! > > > > Here I want to share some of my thoughts: > > > > 1. Agree with that cache() method should return some Table to avoid some > > unexpected problems because of the mutable object. > > All the existing methods of Table are returning a new Table instance. > > > > 2. I think materialize() would be more consistent with SQL, this makes it > > possible to support the same feature for SQL (materialize view) and keep > > the same API for users in the future. > > But I'm also fine if we choose cache(). > > > > 3. In the proposal, a TableService (or FlinkService?) is used to cache > the > > result of the (intermediate) table. > > But the name of TableService may be a bit general which is not quite > > understanding correctly in the first glance (a metastore for tables?). > > Maybe a more specific name would be better, such as TableCacheSerive or > > TableMaterializeSerivce or something else. > > > > Best, > > Jark > > > > > > On Thu, 29 Nov 2018 at 21:16, Fabian Hueske <fhue...@gmail.com> wrote: > > > >> Hi, > >> > >> Thanks for the clarification Becket! > >> > >> I have a few thoughts to share / questions: > >> > >> 1) I'd like to know how you plan to implement the feature on a plan / > >> planner level. > >> > >> I would imaging the following to happen when Table.cache() is called: > >> > >> 1) immediately optimize the Table and internally convert it into a > >> DataSet/DataStream. This is necessary, to avoid that operators of later > >> queries on top of the Table are pushed down. > >> 2) register the DataSet/DataStream as a DataSet/DataStream-backed Table > X > >> 3) add a sink to the DataSet/DataStream. This is the materialization of > the > >> Table X > >> > >> Based on your proposal the following would happen: > >> > >> Table t1 = .... > >> t1.cache(); // cache() returns void. The logical plan of t1 is replaced > by > >> a scan of X. There is also a reference to the materialization of X. > >> > >> t1.count(); // this executes the program, including the > DataSet/DataStream > >> that backs X and the sink that writes the materialization of X > >> t1.count(); // this executes the program, but reads X from the > >> materialization. > >> > >> My question is, how do you determine when whether the scan of t1 should > go > >> against the DataSet/DataStream program and when against the > >> materialization? > >> AFAIK, there is no hook that will tell you that a part of the program > was > >> executed. Flipping a switch during optimization or plan generation is > not > >> sufficient as there is no guarantee that the plan is also executed. > >> > >> Overall, this behavior is somewhat similar to what I proposed in > >> FLINK-8950, which does not include persisting the table, but just > >> optimizing and reregistering it as DataSet/DataStream scan. > >> > >> 2) I think Piotr has a point about the implicit behavior and side > effects > >> of the cache() method if it does not return anything. > >> Consider the following example: > >> > >> Table t1 = ??? > >> Table t2 = methodThatAppliesOperators(t1); > >> Table t3 = methodThatAppliesOtherOperators(t1); > >> > >> In this case, the behavior/performance of the plan that results from the > >> second method call depends on whether t1 was modified by the first > method > >> or not. > >> This is the classic issue of mutable vs. immutable objects. > >> Also, as Piotr pointed out, it might also be good to have the original > plan > >> of t1, because in some cases it is possible to push filters down such > that > >> evaluating the query from scratch might be more efficient than accessing > >> the cache. > >> Moreover, a CachedTable could extend Table() and offer a method > refresh(). > >> This sounds quite useful in an interactive session mode. > >> > >> 3) Regarding the name, I can see both arguments. IMO, materialize() > seems > >> to be more future proof. > >> > >> Best, Fabian > >> > >> Am Do., 29. Nov. 2018 um 12:56 Uhr schrieb Shaoxuan Wang < > >> wshaox...@gmail.com>: > >> > >>> Hi Piotr, > >>> > >>> Thanks for sharing your ideas on the method naming. We will think about > >>> your suggestions. But I don't understand why we need to change the > return > >>> type of cache(). > >>> > >>> Cache() is a physical operation, it does not change the logic of > >>> the `Table`. On the tableAPI layer, we should not introduce a new table > >>> type unless the logic of table has been changed. If we introduce a new > >>> table type `CachedTable`, we need create the same set of methods of > >> `Table` > >>> for it. I don't think it is worth doing this. Or can you please > elaborate > >>> more on what could be the "implicit behaviours/side effects" you are > >>> thinking about? > >>> > >>> Regards, > >>> Shaoxuan > >>> > >>> > >>> > >>> On Thu, Nov 29, 2018 at 7:05 PM Piotr Nowojski < > pi...@data-artisans.com> > >>> wrote: > >>> > >>>> Hi Becket, > >>>> > >>>> Thanks for the response. > >>>> > >>>> 1. I wasn’t saying that materialised view must be mutable or not. The > >>> same > >>>> thing applies to caches as well. To the contrary, I would expect more > >>>> consistency and updates from something that is called “cache” vs > >>> something > >>>> that’s a “materialised view”. In other words, IMO most caches do not > >>> serve > >>>> you invalid/outdated data and they handle updates on their own. > >>>> > >>>> 2. I don’t think that having in the future two very similar concepts > of > >>>> `materialized` view and `cache` is a good idea. It would be confusing > >> for > >>>> the users. I think it could be handled by variations/overloading of > >>>> materialised view concept. We could start with: > >>>> > >>>> `MaterializedTable materialize()` - immutable, session life scope > >>>> (basically the same semantic as you are proposing > >>>> > >>>> And then in the future (if ever) build on top of that/expand it with: > >>>> > >>>> `MaterializedTable materialize(refreshTime=…)` or `MaterializedTable > >>>> materialize(refreshHook=…)` > >>>> > >>>> Or with cross session support: > >>>> > >>>> `MaterializedTable materializeInto(connector=…)` or `MaterializedTable > >>>> materializeInto(tableFactory=…)` > >>>> > >>>> I’m not saying that we should implement cross session/refreshing now > or > >>>> even in the near future. I’m just arguing that naming current > immutable > >>>> session life scope method `materialize()` is more future proof and > more > >>>> consistent with SQL (on which after all table-api is heavily basing > >> on). > >>>> > >>>> 3. Even if we agree on naming it `cache()`, I would still insist on > >>>> `cache()` returning `CachedTable` handle to avoid implicit > >>> behaviours/side > >>>> effects and to give both us & users more flexibility. > >>>> > >>>> Piotrek > >>>> > >>>>> On 29 Nov 2018, at 06:20, Becket Qin <becket....@gmail.com> wrote: > >>>>> > >>>>> Just to add a little bit, the materialized view is probably more > >>> similar > >>>> to > >>>>> the persistent() brought up earlier in the thread. So it is usually > >>> cross > >>>>> session and could be used in a larger scope. For example, a > >>> materialized > >>>>> view created by user A may be visible to user B. It is probably > >>> something > >>>>> we want to have in the future. I'll put it in the future work > >> section. > >>>>> > >>>>> Thanks, > >>>>> > >>>>> Jiangjie (Becket) Qin > >>>>> > >>>>> On Thu, Nov 29, 2018 at 9:47 AM Becket Qin <becket....@gmail.com> > >>> wrote: > >>>>> > >>>>>> Hi Piotrek, > >>>>>> > >>>>>> Thanks for the explanation. > >>>>>> > >>>>>> Right now we are mostly thinking of the cached table as immutable. I > >>> can > >>>>>> see the Materialized view would be useful in the future. That said, > >> I > >>>> think > >>>>>> a simple cache mechanism is probably still needed. So to me, cache() > >>> and > >>>>>> materialize() should be two separate method as they address > >> different > >>>>>> needs. Materialize() is a higher level concept usually implying > >>>> periodical > >>>>>> update, while cache() has much simpler semantic. For example, one > >> may > >>>>>> create a materialized view and use cache() method in the > >> materialized > >>>> view > >>>>>> creation logic. So that during the materialized view update, they do > >>> not > >>>>>> need to worry about the case that the cached table is also changed. > >>>> Maybe > >>>>>> under the hood, materialized() and cache() could share some > >> mechanism, > >>>> but > >>>>>> I think a simple cache() method would be handy in a lot of cases. > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Jiangjie (Becket) Qin > >>>>>> > >>>>>> On Mon, Nov 26, 2018 at 9:38 PM Piotr Nowojski < > >>> pi...@data-artisans.com > >>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Becket, > >>>>>>> > >>>>>>>> Is there any extra thing user can do on a MaterializedTable that > >>> they > >>>>>>> cannot do on a Table? > >>>>>>> > >>>>>>> Maybe not in the initial implementation, but various DBs offer > >>>> different > >>>>>>> ways to “refresh” the materialised view. Hooks, triggers, timers, > >>>> manually > >>>>>>> etc. Having `MaterializedTable` would help us to handle that in the > >>>> future. > >>>>>>> > >>>>>>>> After users call *table.cache(), *users can just use that table > >> and > >>> do > >>>>>>> anything that is supported on a Table, including SQL. > >>>>>>> > >>>>>>> This is some implicit behaviour with side effects. Imagine if user > >>> has > >>>> a > >>>>>>> long and complicated program, that touches table `b` multiple > >> times, > >>>> maybe > >>>>>>> scattered around different methods. If he modifies his program by > >>>> inserting > >>>>>>> in one place > >>>>>>> > >>>>>>> b.cache() > >>>>>>> > >>>>>>> This implicitly alters the semantic and behaviour of his code all > >>> over > >>>>>>> the place, maybe in a ways that might cause problems. For example > >>> what > >>>> if > >>>>>>> underlying data is changing? > >>>>>>> > >>>>>>> Having invisible side effects is also not very clean, for example > >>> think > >>>>>>> about something like this (but more complicated): > >>>>>>> > >>>>>>> Table b = ...; > >>>>>>> > >>>>>>> If (some_condition) { > >>>>>>> processTable1(b) > >>>>>>> } > >>>>>>> else { > >>>>>>> processTable2(b) > >>>>>>> } > >>>>>>> > >>>>>>> // do more stuff with b > >>>>>>> > >>>>>>> And user adds `b.cache()` call to only one of the `processTable1` > >> or > >>>>>>> `processTable2` methods. > >>>>>>> > >>>>>>> On the other hand > >>>>>>> > >>>>>>> Table materialisedB = b.materialize() > >>>>>>> > >>>>>>> Avoids (at least some of) the side effect issues and forces user to > >>>>>>> explicitly use `materialisedB` where it’s appropriate and forces > >> user > >>>> to > >>>>>>> think what does it actually mean. And if something doesn’t work in > >>> the > >>>> end > >>>>>>> for the user, he will know what has he changed instead of blaming > >>>> Flink for > >>>>>>> some “magic” underneath. In the above example, after materialising > >> b > >>> in > >>>>>>> only one of the methods, he should/would realise about the issue > >> when > >>>>>>> handling the return value `MaterializedTable` of that method. > >>>>>>> > >>>>>>> I guess it comes down to personal preferences if you like things to > >>> be > >>>>>>> implicit or not. The more power is the user, probably the more > >> likely > >>>> he is > >>>>>>> to like/understand implicit behaviour. And we as Table API > >> designers > >>>> are > >>>>>>> the most power users out there, so I would proceed with caution (so > >>>> that we > >>>>>>> do not end up in the crazy perl realm with it’s lovely implicit > >>> method > >>>>>>> arguments ;) <https://stackoverflow.com/a/14922656/8149051>) > >>>>>>> > >>>>>>>> Table API to also support non-relational processing cases, cache() > >>>>>>> might be slightly better. > >>>>>>> > >>>>>>> I think even such extended Table API could benefit from sticking > >>>> to/being > >>>>>>> consistent with SQL where both SQL and Table API are basically the > >>>> same. > >>>>>>> > >>>>>>> One more thing. `MaterializedTable materialize()` could be more > >>>>>>> powerful/flexible allowing the user to operate both on materialised > >>>> and not > >>>>>>> materialised view at the same time for whatever reasons (underlying > >>>> data > >>>>>>> changing/better optimisation opportunities after pushing down more > >>>> filters > >>>>>>> etc). For example: > >>>>>>> > >>>>>>> Table b = …; > >>>>>>> > >>>>>>> MaterlizedTable mb = b.materialize(); > >>>>>>> > >>>>>>> Val min = mb.min(); > >>>>>>> Val max = mb.max(); > >>>>>>> > >>>>>>> Val user42 = b.filter(‘userId = 42); > >>>>>>> > >>>>>>> Could be more efficient compared to `b.cache()` if `filter(‘userId > >> = > >>>>>>> 42);` allows for much more aggressive optimisations. > >>>>>>> > >>>>>>> Piotrek > >>>>>>> > >>>>>>>> On 26 Nov 2018, at 12:14, Fabian Hueske <fhue...@gmail.com> > >> wrote: > >>>>>>>> > >>>>>>>> I'm not suggesting to add support for Ignite. This was just an > >>>> example. > >>>>>>>> Plasma and Arrow sound interesting, too. > >>>>>>>> For the sake of this proposal, it would be up to the user to > >>>> implement a > >>>>>>>> TableFactory and corresponding TableSource / TableSink classes to > >>>>>>> persist > >>>>>>>> and read the data. > >>>>>>>> > >>>>>>>> Am Mo., 26. Nov. 2018 um 12:06 Uhr schrieb Flavio Pompermaier < > >>>>>>>> pomperma...@okkam.it>: > >>>>>>>> > >>>>>>>>> What about to add also Apache Plasma + Arrow as an alternative to > >>>>>>> Apache > >>>>>>>>> Ignite? > >>>>>>>>> [1] > >>>>>>>>> > >>>>>>> > >>>> > >> https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/ > >>>>>>>>> > >>>>>>>>> On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske < > >> fhue...@gmail.com> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> Thanks for the proposal! > >>>>>>>>>> > >>>>>>>>>> To summarize, you propose a new method Table.cache(): Table that > >>>> will > >>>>>>>>>> trigger a job and write the result into some temporary storage > >> as > >>>>>>> defined > >>>>>>>>>> by a TableFactory. > >>>>>>>>>> The cache() call blocks while the job is running and eventually > >>>>>>> returns a > >>>>>>>>>> Table object that represents a scan of the temporary table. > >>>>>>>>>> When the "session" is closed (closing to be defined?), the > >>> temporary > >>>>>>>>> tables > >>>>>>>>>> are all dropped. > >>>>>>>>>> > >>>>>>>>>> I think this behavior makes sense and is a good first step > >> towards > >>>>>>> more > >>>>>>>>>> interactive workloads. > >>>>>>>>>> However, its performance suffers from writing to and reading > >> from > >>>>>>>>> external > >>>>>>>>>> systems. > >>>>>>>>>> I think this is OK for now. Changes that would significantly > >>> improve > >>>>>>> the > >>>>>>>>>> situation (i.e., pinning data in-memory across jobs) would have > >>>> large > >>>>>>>>>> impacts on many components of Flink. > >>>>>>>>>> Users could use in-memory filesystems or storage grids (Apache > >>>>>>> Ignite) to > >>>>>>>>>> mitigate some of the performance effects. > >>>>>>>>>> > >>>>>>>>>> Best, Fabian > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Am Mo., 26. Nov. 2018 um 03:38 Uhr schrieb Becket Qin < > >>>>>>>>>> becket....@gmail.com > >>>>>>>>>>> : > >>>>>>>>>> > >>>>>>>>>>> Thanks for the explanation, Piotrek. > >>>>>>>>>>> > >>>>>>>>>>> Is there any extra thing user can do on a MaterializedTable > >> that > >>>> they > >>>>>>>>>>> cannot do on a Table? After users call *table.cache(), *users > >> can > >>>>>>> just > >>>>>>>>>> use > >>>>>>>>>>> that table and do anything that is supported on a Table, > >>> including > >>>>>>> SQL. > >>>>>>>>>>> > >>>>>>>>>>> Naming wise, either cache() or materialize() sounds fine to me. > >>>>>>> cache() > >>>>>>>>>> is > >>>>>>>>>>> a bit more general than materialize(). Given that we are > >>> enhancing > >>>>>>> the > >>>>>>>>>>> Table API to also support non-relational processing cases, > >>> cache() > >>>>>>>>> might > >>>>>>>>>> be > >>>>>>>>>>> slightly better. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Nov 23, 2018 at 11:25 PM Piotr Nowojski < > >>>>>>>>> pi...@data-artisans.com > >>>>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi Becket, > >>>>>>>>>>>> > >>>>>>>>>>>> Ops, sorry I didn’t notice that you intend to reuse existing > >>>>>>>>>>>> `TableFactory`. I don’t know why, but I assumed that you want > >> to > >>>>>>>>>> provide > >>>>>>>>>>> an > >>>>>>>>>>>> alternate way of writing the data. > >>>>>>>>>>>> > >>>>>>>>>>>> Now that I hopefully understand the proposal, maybe we could > >>>> rename > >>>>>>>>>>>> `cache()` to > >>>>>>>>>>>> > >>>>>>>>>>>> void materialize() > >>>>>>>>>>>> > >>>>>>>>>>>> or going step further > >>>>>>>>>>>> > >>>>>>>>>>>> MaterializedTable materialize() > >>>>>>>>>>>> MaterializedTable createMaterializedView() > >>>>>>>>>>>> > >>>>>>>>>>>> ? > >>>>>>>>>>>> > >>>>>>>>>>>> The second option with returning a handle I think is more > >>> flexible > >>>>>>>>> and > >>>>>>>>>>>> could provide features such as “refresh”/“delete” or generally > >>>>>>>>> speaking > >>>>>>>>>>>> manage the the view. In the future we could also think about > >>>> adding > >>>>>>>>>> hooks > >>>>>>>>>>>> to automatically refresh view etc. It is also more explicit - > >>>>>>>>>>>> materialization returning a new table handle will not have the > >>>> same > >>>>>>>>>>>> implicit side effects as adding a simple line of code like > >>>>>>>>> `b.cache()` > >>>>>>>>>>>> would have. > >>>>>>>>>>>> > >>>>>>>>>>>> It would also be more SQL like, making it more intuitive for > >>> users > >>>>>>>>>>> already > >>>>>>>>>>>> familiar with the SQL. > >>>>>>>>>>>> > >>>>>>>>>>>> Piotrek > >>>>>>>>>>>> > >>>>>>>>>>>>> On 23 Nov 2018, at 14:53, Becket Qin <becket....@gmail.com> > >>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Piotrek, > >>>>>>>>>>>>> > >>>>>>>>>>>>> For the cache() method itself, yes, it is equivalent to > >>> creating > >>>> a > >>>>>>>>>>>> BUILT-IN > >>>>>>>>>>>>> materialized view with a lifecycle. That functionality is > >>> missing > >>>>>>>>>>> today, > >>>>>>>>>>>>> though. Not sure if I understand your question. Do you mean > >> we > >>>>>>>>>> already > >>>>>>>>>>>> have > >>>>>>>>>>>>> the functionality and just need a syntax sugar? > >>>>>>>>>>>>> > >>>>>>>>>>>>> What's more interesting in the proposal is do we want to stop > >>> at > >>>>>>>>>>> creating > >>>>>>>>>>>>> the materialized view? Or do we want to extend that in the > >>> future > >>>>>>>>> to > >>>>>>>>>> a > >>>>>>>>>>>> more > >>>>>>>>>>>>> useful unified data store distributed with Flink? And do we > >>> want > >>>> to > >>>>>>>>>>> have > >>>>>>>>>>>> a > >>>>>>>>>>>>> mechanism allow more flexible user job pattern with their own > >>>> user > >>>>>>>>>>>> defined > >>>>>>>>>>>>> services. These considerations are much more architectural. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski < > >>>>>>>>>>> pi...@data-artisans.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Interesting idea. I’m trying to understand the problem. > >> Isn’t > >>>> the > >>>>>>>>>>>>>> `cache()` call an equivalent of writing data to a sink and > >>> later > >>>>>>>>>>> reading > >>>>>>>>>>>>>> from it? Where this sink has a limited live scope/live time? > >>> And > >>>>>>>>> the > >>>>>>>>>>>> sink > >>>>>>>>>>>>>> could be implemented as in memory or a file sink? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> If so, what’s the problem with creating a materialised view > >>>> from a > >>>>>>>>>>> table > >>>>>>>>>>>>>> “b” (from your document’s example) and reusing this > >>> materialised > >>>>>>>>>> view > >>>>>>>>>>>>>> later? Maybe we are lacking mechanisms to clean up > >>> materialised > >>>>>>>>>> views > >>>>>>>>>>>> (for > >>>>>>>>>>>>>> example when current session finishes)? Maybe we need some > >>>>>>>>> syntactic > >>>>>>>>>>>> sugar > >>>>>>>>>>>>>> on top of it? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Piotrek > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 23 Nov 2018, at 07:21, Becket Qin <becket....@gmail.com > >>> > >>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the suggestion, Jincheng. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Yes, I think it makes sense to have a persist() with > >>>>>>>>>>> lifecycle/defined > >>>>>>>>>>>>>>> scope. I just added a section in the future work for this. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun < > >>>>>>>>>>> sunjincheng...@gmail.com > >>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jiangjie, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thank you for the explanation about the name of > >> `cache()`, I > >>>>>>>>>>>> understand > >>>>>>>>>>>>>> why > >>>>>>>>>>>>>>>> you designed this way! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Another idea is whether we can specify a lifecycle for > >> data > >>>>>>>>>>>> persistence? > >>>>>>>>>>>>>>>> For example, persist (LifeCycle.SESSION), so that the user > >>> is > >>>>>>>>> not > >>>>>>>>>>>>>> worried > >>>>>>>>>>>>>>>> about data loss, and will clearly specify the time range > >> for > >>>>>>>>>> keeping > >>>>>>>>>>>>>> time. > >>>>>>>>>>>>>>>> At the same time, if we want to expand, we can also share > >>> in a > >>>>>>>>>>> certain > >>>>>>>>>>>>>>>> group of session, for example: > >>> LifeCycle.SESSION_GROUP(...), I > >>>>>>>>> am > >>>>>>>>>>> not > >>>>>>>>>>>>>> sure, > >>>>>>>>>>>>>>>> just an immature suggestion, for reference only! > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Bests, > >>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年11月23日周五 > >> 下午1:33写道: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Re: Jincheng, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for the feedback. Regarding cache() v.s. > >> persist(), > >>>>>>>>>>>> personally I > >>>>>>>>>>>>>>>>> find cache() to be more accurately describing the > >> behavior, > >>>>>>>>> i.e. > >>>>>>>>>>> the > >>>>>>>>>>>>>>>> Table > >>>>>>>>>>>>>>>>> is cached for the session, but will be deleted after the > >>>>>>>>> session > >>>>>>>>>> is > >>>>>>>>>>>>>>>> closed. > >>>>>>>>>>>>>>>>> persist() seems a little misleading as people might think > >>> the > >>>>>>>>>> table > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>> still be there even after the session is gone. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Great point about mixing the batch and stream processing > >> in > >>>> the > >>>>>>>>>>> same > >>>>>>>>>>>>>> job. > >>>>>>>>>>>>>>>>> We should absolutely move towards that goal. I imagine > >> that > >>>>>>>>> would > >>>>>>>>>>> be > >>>>>>>>>>>> a > >>>>>>>>>>>>>>>> huge > >>>>>>>>>>>>>>>>> change across the board, including sources, operators and > >>>>>>>>>>>>>> optimizations, > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> name some. Likely we will need several separate in-depth > >>>>>>>>>>> discussions. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui < > >>>>>>>>> xingc...@gmail.com> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> @Shaoxuan, I think the lifecycle or access domain are > >> both > >>>>>>>>>>>> orthogonal > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> the cache problem. Essentially, this may be the first > >> time > >>>> we > >>>>>>>>>> plan > >>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> introduce another storage mechanism other than the > >> state. > >>>>>>>>> Maybe > >>>>>>>>>>> it’s > >>>>>>>>>>>>>>>>> better > >>>>>>>>>>>>>>>>>> to first draw a big picture and then concentrate on a > >>>> specific > >>>>>>>>>>> part? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> @Becket, yes, actually I am more concerned with the > >>>> underlying > >>>>>>>>>>>>>> service. > >>>>>>>>>>>>>>>>>> This seems to be quite a major change to the existing > >>>>>>>>> codebase. > >>>>>>>>>> As > >>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> claimed, the service should be extendible to support > >> other > >>>>>>>>>>>> components > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> we’d better discussed it in another thread. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> All in all, I also eager to enjoy the more interactive > >>> Table > >>>>>>>>>> API, > >>>>>>>>>>> in > >>>>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>>>> of a general and flexible enough service mechanism. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>> Xingcan > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang < > >>>>>>>>>> xiaow...@gmail.com> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Relying on a callback for the temp table for clean up > >> is > >>>> not > >>>>>>>>>> very > >>>>>>>>>>>>>>>>>> reliable. > >>>>>>>>>>>>>>>>>>> There is no guarantee that it will be executed > >>>> successfully. > >>>>>>>>> We > >>>>>>>>>>> may > >>>>>>>>>>>>>>>>> risk > >>>>>>>>>>>>>>>>>>> leaks when that happens. I think that it's safer to > >> have > >>> an > >>>>>>>>>>>>>>>> association > >>>>>>>>>>>>>>>>>>> between temp table and session id. So we can always > >> clean > >>>> up > >>>>>>>>>> temp > >>>>>>>>>>>>>>>>> tables > >>>>>>>>>>>>>>>>>>> which are no longer associated with any active > >> sessions. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>> Xiaowei > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun < > >>>>>>>>>>>>>>>>> sunjincheng...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Jiangjie&Shaoxuan, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for initiating this great proposal! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Interactive Programming is very useful and user > >> friendly > >>>> in > >>>>>>>>>> case > >>>>>>>>>>>> of > >>>>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>>> examples. > >>>>>>>>>>>>>>>>>>>> Moreover, especially when a business has to be > >> executed > >>> in > >>>>>>>>>>> several > >>>>>>>>>>>>>>>>>> stages > >>>>>>>>>>>>>>>>>>>> with dependencies,such as the pipeline of Flink ML, in > >>>> order > >>>>>>>>>> to > >>>>>>>>>>>>>>>>> utilize > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> intermediate calculation results we have to submit a > >> job > >>>> by > >>>>>>>>>>>>>>>>>> env.execute(). > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> About the `cache()` , I think is better to named > >>>>>>>>> `persist()`, > >>>>>>>>>>> And > >>>>>>>>>>>>>>>> The > >>>>>>>>>>>>>>>>>>>> Flink framework determines whether we internally cache > >>> in > >>>>>>>>>> memory > >>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>>>> to the storage system,Maybe save the data into state > >>>> backend > >>>>>>>>>>>>>>>>>>>> (MemoryStateBackend or RocksDBStateBackend etc.) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> BTW, from the points of my view in the future, support > >>> for > >>>>>>>>>>>> streaming > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> batch mode switching in the same job will also benefit > >>> in > >>>>>>>>>>>>>>>> "Interactive > >>>>>>>>>>>>>>>>>>>> Programming", I am looking forward to your JIRAs and > >>>> FLIP! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>> Jincheng > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Becket Qin <becket....@gmail.com> 于2018年11月20日周二 > >>>> 下午9:56写道: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> As a few recent email threads have pointed out, it > >> is a > >>>>>>>>>>> promising > >>>>>>>>>>>>>>>>>>>>> opportunity to enhance Flink Table API in various > >>>> aspects, > >>>>>>>>>>>>>>>> including > >>>>>>>>>>>>>>>>>>>>> functionality and ease of use among others. One of > >> the > >>>>>>>>>>> scenarios > >>>>>>>>>>>>>>>>> where > >>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>> feel Flink could improve is interactive programming. > >> To > >>>>>>>>>> explain > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> issues > >>>>>>>>>>>>>>>>>>>>> and facilitate the discussion on the solution, we put > >>>>>>>>>> together > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>> following document with our proposal. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>> > >>> > >> > https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Feedback and comments are very welcome! > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>> > >>>> > >>> > >> > >