Hi folks,

It seems that all the raised concerns so far have been resolved. I plan to 
start a voting thread for FLIP-36 early next week if there are no comments.

Thanks,
Xuannan
On Jul 28, 2020, 7:42 PM +0800, Xuannan Su <suxuanna...@gmail.com>, wrote:
> Hi Kurt,
>
> Thanks for the comments.
>
> You are right that the FLIP lacks a proper discussion about the impact of the 
> optimizer. I have added the section to talk about how the cache table works 
> with the optimizer. I hope this could resolve your concern. Please let me 
> know if you have any further comments.
>
> Best,
> Xuannan
> On Jul 22, 2020, 4:36 PM +0800, Kurt Young <ykt...@gmail.com>, wrote:
> > Thanks for the reply, I have one more comment about the optimizer
> > affection. Even if you are
> > trying to make the cached table be as orthogonal to the optimizer as
> > possible by introducing
> > a special sink, it is still not clear why this approach is safe. Maybe you
> > can add some process
> > introduction from API to JobGraph, otherwise I can't make sure everyone
> > reviewing the design
> > doc will have the same imagination about this. And I'm also quite sure some
> > of the existing
> > mechanism will be affected by this special sink, e.g. multi sink
> > optimization.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <suxuanna...@gmail.com> wrote:
> >
> > > Hi Kurt,
> > >
> > > Thanks for the comments.
> > >
> > > 1. How do you identify the CachedTable?
> > > For the current design proposed in FLIP-36, we are using the first
> > > approach you mentioned, where the key of the map is the Cached Table java
> > > object. I think it is fine not to be able to identify another table
> > > representing the same DAG and not using the cached intermediate result
> > > because we want to make the caching table explicit. As mentioned in the
> > > FLIP, the cache API will return a Table object. And the user has to use 
> > > the
> > > returned Table object to make use of the cached table. The rationale is
> > > that if the user builds the same DAG from scratch with some
> > > TableEnvironment instead of using the cached table object, the user
> > > probably doesn't want to use the cache.
> > >
> > > 2. How does the CachedTable affect the optimizer?
> > > We try to make the logic dealing with the cached table be as orthogonal to
> > > the optimizer as possible. That's why we introduce a special sink when we
> > > are going to cache a table and a special source when we are going to use a
> > > cached table. This way, we can let the optimizer does it works, and the
> > > logic of modifying the job graph can happen in the job graph generator. We
> > > can recognize the cached node with the special sink and source.
> > >
> > > 3. What's the effect of calling TableEnvironment.close()?
> > > We introduce the close method to prevent leaking of the cached table when
> > > the user is done with the table environment. Therefore, it makes more 
> > > sense
> > > that the table environment, including all of its functionality, should not
> > > be used after closing. Otherwise, we should rename the close method to
> > > clearAllCache or something similar.
> > >
> > > And thanks for pointing out the use of not existing API used in the given
> > > examples. I have updated the examples in the FLIP accordingly.
> > >
> > > Best,
> > > Xuannan
> > > On Jul 16, 2020, 4:15 PM +0800, Kurt Young <ykt...@gmail.com>, wrote:
> > > > Hi Xuanna,
> > > >
> > > > Thanks for the detailed design doc, it described clearly how the API
> > > looks
> > > > and how to interact with Flink runtime.
> > > > However, the part which relates to SQL's optimizer is kind of blurry. To
> > > be
> > > > more precise, I have following questions:
> > > >
> > > > 1. How do you identify the CachedTable? I can imagine there would be map
> > > > representing the cache, how do you
> > > > compare the keys of the map? One approach is they will be compared by
> > > java
> > > > objects, which is simple but has
> > > > limited scope. For example, users created another table using some
> > > > interfaces of TableEnvironment, and the table
> > > > is exactly the same as the cached one, you won't be able to identify it.
> > > > Another choice is calculating the "signature" or
> > > > "diest" of the cached table, which involves string representation of the
> > > > whole sub tree represented by the cached table.
> > > > I don't think Flink currently provides such a mechanism around Table
> > > > though.
> > > >
> > > > 2. How does the CachedTable affect the optimizer? Specifically, will you
> > > > have a dedicated QueryOperation for it, will you have
> > > > a dedicated logical & physical RelNode for it? And I also don't see a
> > > > description about how to work with current optimize phases,
> > > > from Operation to Calcite rel node, and then to Flink's logical and
> > > > physical node, which will be at last translated to Flink's exec node.
> > > > There also exists other optimizations such as dead lock breaker, as well
> > > as
> > > > sub plan reuse inside the optimizer, I'm not sure whether
> > > > the logic dealing with cached tables can be orthogonal to all of these.
> > > > Hence I expect you could have a more detailed description here.
> > > >
> > > > 3. What's the effect of calling TableEnvironment.close()? You already
> > > > explained this would drop all caches this table env has,
> > > > could you also explain where other functionality still works for this
> > > table
> > > > env? Like can use still create/drop tables/databases/function
> > > > through this table env? What happens to the catalog and all temporary
> > > > objects of this table env?
> > > >
> > > > One minor comment: I noticed you used some not existing API in the
> > > examples
> > > > you gave, like table.collect(), which is a little
> > > > misleading.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <suxuanna...@gmail.com> wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I'd like to revive the discussion about FLIP-36 Support Interactive
> > > > > Programming in Flink Table API
> > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > >
> > > > > The FLIP proposes to add support for interactive programming in Flink
> > > > > Table API. Specifically, it let users cache the intermediate
> > > > > results(tables) and use them in the later jobs to avoid recomputing 
> > > > > the
> > > > > intermediate result(tables).
> > > > >
> > > > > I am looking forward to any opinions and suggestions from the
> > > community.
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > > On May 7, 2020, 5:40 PM +0800, Xuannan Su <suxuanna...@gmail.com>,
> > > wrote:
> > > > > > Hi,
> > > > > >
> > > > > > There are some feedbacks from @Timo and @Kurt in the voting thread
> > > for
> > > > > FLIP-36 and I want to share my thoughts here.
> > > > > >
> > > > > > 1. How would the FLIP-36 look like after FLIP-84?
> > > > > > I don't think FLIP-84 will affect FLIP-36 from the public API
> > > > > perspective. Users can call .cache on a table object and the cached
> > > table
> > > > > will be generated whenever the table job is triggered to execute,
> > > either by
> > > > > Table#executeInsert or StatementSet#execute. I think that FLIP-36
> > > should
> > > > > aware of the changes made by FLIP-84, but it shouldn't be a problem.
> > > At the
> > > > > end of the day, FLIP-36 only requires the ability to add a sink to a
> > > node,
> > > > > submit a table job with multiple sinks, and replace the cached table
> > > with a
> > > > > source.
> > > > > >
> > > > > > 2. How can we support cache in a multi-statement SQL file?
> > > > > > The most intuitive way to support cache in a multi-statement SQL
> > > file is
> > > > > by using a view, where the view is corresponding to a cached table.
> > > > > >
> > > > > > 3. Unifying the cached table and materialized views
> > > > > > It is true that the cached table and the materialized view are
> > > similar
> > > > > in some way. However, I think the materialized view is a more complex
> > > > > concept. First, a materialized view requires some kind of a refresh
> > > > > mechanism to synchronize with the table. Secondly, the life cycle of a
> > > > > materialized view is longer. The materialized view should be 
> > > > > accessible
> > > > > even after the application exits and should be accessible by another
> > > > > application, while the cached table is only accessible in the
> > > application
> > > > > where it is created. The cached table is introduced to avoid
> > > recomputation
> > > > > of an intermediate table to support interactive programming in Flink
> > > Table
> > > > > API. And I think the materialized view needs more discussion and
> > > certainly
> > > > > deserves a whole new FLIP.
> > > > > >
> > > > > > Please let me know your thought.
> > > > > >
> > > > > > Best,
> > > > > > Xuannan
> > > > > >
> > > > > On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <suxuanna...@gmail.com>
> > > wrote:
> > > > > > Hi folks,
> > > > > >
> > > > > > The FLIP-36 is updated according to the discussion with Becket. In
> > > the
> > > > > meantime, any comments are very welcome.
> > > > > >
> > > > > > If there are no further comments, I would like to start the voting
> > > > > > thread by tomorrow.
> > > > > >
> > > > > > Thanks,
> > > > > > Xuannan
> > > > > >
> > > > > >
> > > > > > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <suxuanna...@gmail.com>
> > > > > wrote:
> > > > > > > > Hi Becket,
> > > > > > > >
> > > > > > > > You are right. It makes sense to treat retry of job 2 as an
> > > ordinary
> > > > > job. And the config does introduce some unnecessary confusion. Thank
> > > you
> > > > > for you comment. I will update the FLIP.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
> > > becket....@gmail.com>
> > > > > wrote:
> > > > > > > > > > Hi Xuannan,
> > > > > > > > > >
> > > > > > > > > > If user submits Job 1 and generated a cached intermediate
> > > > > result. And later
> > > > > > > > > > on, user submitted job 2 which should ideally use the
> > > > > intermediate result.
> > > > > > > > > > In that case, if job 2 failed due to missing the 
> > > > > > > > > > intermediate
> > > > > result, Job 2
> > > > > > > > > > should be retried with its full DAG. After that when Job 2
> > > runs,
> > > > > it will
> > > > > > > > > > also re-generate the cache. However, once job 2 has fell
> > > back to
> > > > > the
> > > > > > > > > > original DAG, should it just be treated as an ordinary job
> > > that
> > > > > follow the
> > > > > > > > > > recovery strategy? Having a separate configuration seems a
> > > little
> > > > > > > > > > confusing. In another word, re-generating the cache is just 
> > > > > > > > > > a
> > > > > byproduct of
> > > > > > > > > > running the full DAG of job 2, but is not the main purpose.
> > > It
> > > > > is just like
> > > > > > > > > > when job 1 runs to generate cache, it does not have a
> > > separate
> > > > > config of
> > > > > > > > > > retry to make sure the cache is generated. If it fails, it
> > > just
> > > > > fail like
> > > > > > > > > > an ordinary job.
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> > > > > suxuanna...@gmail.com> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Becket,
> > > > > > > > > > >
> > > > > > > > > > > The intermediate result will indeed be automatically
> > > > > re-generated by
> > > > > > > > > > > resubmitting the original DAG. And that job could fail as
> > > > > well. In that
> > > > > > > > > > > case, we need to decide if we should resubmit the original
> > > DAG
> > > > > to
> > > > > > > > > > > re-generate the intermediate result or give up and throw 
> > > > > > > > > > > an
> > > > > exception to
> > > > > > > > > > > the user. And the config is to indicate how many resubmit
> > > > > should happen
> > > > > > > > > > > before giving up.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Xuannan
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> > > > > becket....@gmail.com> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > >
> > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > > mentioned. The
> > > > > > > > > > > users
> > > > > > > > > > > > > can use the cached table object returned by the
> > > .cache()
> > > > > method in
> > > > > > > > > > > other
> > > > > > > > > > > > > job and it should read the intermediate result. The
> > > > > intermediate result
> > > > > > > > > > > > can
> > > > > > > > > > > > > gone in the following three cases: 1. the user
> > > explicitly
> > > > > call the
> > > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > > closed
> > > > > 3. failure
> > > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > > result will not
> > > > > > > > > > > be
> > > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > What confused me was that why do we need to have a
> > > > > *cache.retries.max
> > > > > > > > > > > > *config?
> > > > > > > > > > > > Shouldn't the missing intermediate result always be
> > > > > automatically
> > > > > > > > > > > > re-generated if it is gone?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> > > > > suxuanna...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Becket,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the comments.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> > > > > becket....@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> > > > > overall. Some
> > > > > > > > > > > quick
> > > > > > > > > > > > > > comments / questions below:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Yes, the public interface of Table and 
> > > > > > > > > > > > > TableEnvironment
> > > > > should be made
> > > > > > > > > > > in
> > > > > > > > > > > > > the Java API.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. What are the cases that users may want to retry
> > > > > reading the
> > > > > > > > > > > > > intermediate
> > > > > > > > > > > > > > result? It seems that once the intermediate result
> > > has
> > > > > gone, it will
> > > > > > > > > > > > not
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > available later without being generated again, 
> > > > > > > > > > > > > > right?
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > > mentioned. The
> > > > > > > > > > > > users
> > > > > > > > > > > > > can use the cached table object returned by the
> > > .cache()
> > > > > method in
> > > > > > > > > > > other
> > > > > > > > > > > > > job and it should read the intermediate result. The
> > > > > intermediate result
> > > > > > > > > > > > can
> > > > > > > > > > > > > gone in the following three cases: 1. the user
> > > explicitly
> > > > > call the
> > > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > > closed
> > > > > 3. failure
> > > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > > result will not
> > > > > > > > > > > be
> > > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3. In the "semantic of cache() method" section, the
> > > > > description "The
> > > > > > > > > > > > > > semantic of the *cache() *method is a little
> > > different
> > > > > depending on
> > > > > > > > > > > > > whether
> > > > > > > > > > > > > > auto caching is enabled or not." seems not 
> > > > > > > > > > > > > > explained.
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > This line is actually outdated and should be removed,
> > > as
> > > > > we are not
> > > > > > > > > > > > adding
> > > > > > > > > > > > > the auto caching functionality in this FLIP. Auto
> > > caching
> > > > > will be added
> > > > > > > > > > > > in
> > > > > > > > > > > > > the future, and the semantic of cache() when auto
> > > caching
> > > > > is enabled
> > > > > > > > > > > will
> > > > > > > > > > > > > be discussed in detail by a new FLIP. I will remove 
> > > > > > > > > > > > > the
> > > > > descriptor to
> > > > > > > > > > > > avoid
> > > > > > > > > > > > > further confusion.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> > > > > suxuanna...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi folks,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'd like to start the discussion about FLIP-36
> > > Support
> > > > > Interactive
> > > > > > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The FLIP proposes to add support for interactive
> > > > > programming in
> > > > > > > > > > > Flink
> > > > > > > > > > > > > > Table
> > > > > > > > > > > > > > > API. Specifically, it let users cache the
> > > intermediate
> > > > > > > > > > > > results(tables)
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Even though the FLIP has been discussed in the
> > > > > past[1], the FLIP
> > > > > > > > > > > > hasn't
> > > > > > > > > > > > > > > formally passed the vote yet. And some of the
> > > design
> > > > > and
> > > > > > > > > > > > implementation
> > > > > > > > > > > > > > > detail have to change to incorporates the cluster
> > > > > partition
> > > > > > > > > > > proposed
> > > > > > > > > > > > in
> > > > > > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Xuannan
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > >
> > > https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > >
> > >

Reply via email to