Hi Xuanna,

Thanks for the detailed design doc, it described clearly how the API looks
and how to interact with Flink runtime.
However, the part which relates to SQL's optimizer is kind of blurry. To be
more precise, I have following questions:

1. How do you identify the CachedTable? I can imagine there would be map
representing the cache, how do you
compare the keys of the map? One approach is they will be compared by java
objects, which is simple but has
limited scope. For example, users created another table using some
interfaces of TableEnvironment, and the table
is exactly the same as the cached one, you won't be able to identify it.
Another choice is calculating the "signature" or
"diest" of the cached table, which involves string representation of the
whole sub tree represented by the cached table.
I don't think Flink currently provides such a mechanism around Table
though.

2. How does the CachedTable affect the optimizer? Specifically, will you
have a dedicated QueryOperation for it, will you have
a dedicated logical & physical RelNode for it? And I also don't see a
description about how to work with current optimize phases,
from Operation to Calcite rel node, and then to Flink's logical and
physical node, which will be at last translated to Flink's exec node.
There also exists other optimizations such as dead lock breaker, as well as
sub plan reuse inside the optimizer, I'm not sure whether
the logic dealing with cached tables can be orthogonal to all of these.
Hence I expect you could have a more detailed description here.

3. What's the effect of calling TableEnvironment.close()? You already
explained this would drop all caches this table env has,
could you also explain where other functionality still works for this table
env? Like can use still create/drop tables/databases/function
through this table env? What happens to the catalog and all temporary
objects of this table env?

One minor comment: I noticed you used some not existing API in the examples
you gave, like table.collect(), which is a little
misleading.

Best,
Kurt


On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <suxuanna...@gmail.com> wrote:

> Hi folks,
>
> I'd like to revive the discussion about FLIP-36 Support Interactive
> Programming in Flink Table API
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
> The FLIP proposes to add support for interactive programming in Flink
> Table API. Specifically, it let users cache the intermediate
> results(tables) and use them in the later jobs to avoid recomputing the
> intermediate result(tables).
>
> I am looking forward to any opinions and suggestions from the community.
>
> Best,
> Xuannan
> On May 7, 2020, 5:40 PM +0800, Xuannan Su <suxuanna...@gmail.com>, wrote:
> > Hi,
> >
> > There are some feedbacks from @Timo and @Kurt in the voting thread for
> FLIP-36 and I want to share my thoughts here.
> >
> > 1. How would the FLIP-36 look like after FLIP-84?
> > I don't think FLIP-84 will affect FLIP-36 from the public API
> perspective. Users can call .cache on a table object and the cached table
> will be generated whenever the table job is triggered to execute, either by
> Table#executeInsert or StatementSet#execute. I think that FLIP-36 should
> aware of the changes made by FLIP-84, but it shouldn't be a problem. At the
> end of the day, FLIP-36 only requires the ability to add a sink to a node,
> submit a table job with multiple sinks, and replace the cached table with a
> source.
> >
> > 2. How can we support cache in a multi-statement SQL file?
> > The most intuitive way to support cache in a multi-statement SQL file is
> by using a view, where the view is corresponding to a cached table.
> >
> > 3. Unifying the cached table and materialized views
> > It is true that the cached table and the materialized view are similar
> in some way. However, I think the materialized view is a more complex
> concept. First, a materialized view requires some kind of a refresh
> mechanism to synchronize with the table. Secondly, the life cycle of a
> materialized view is longer. The materialized view should be accessible
> even after the application exits and should be accessible by another
> application, while the cached table is only accessible in the application
> where it is created. The cached table is introduced to avoid recomputation
> of an intermediate table to support interactive programming in Flink Table
> API. And I think the materialized view needs more discussion and certainly
> deserves a whole new FLIP.
> >
> > Please let me know your thought.
> >
> > Best,
> > Xuannan
> >
> On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <suxuanna...@gmail.com> wrote:
> > Hi folks,
> >
> > The FLIP-36 is updated according to the discussion with Becket. In the
> meantime, any comments are very welcome.
> >
> > If there are no further comments, I would like to start the voting
> > thread by tomorrow.
> >
> > Thanks,
> > Xuannan
> >
> >
> > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <suxuanna...@gmail.com>
> wrote:
> > > > Hi Becket,
> > > >
> > > > You are right. It makes sense to treat retry of job 2 as an ordinary
> job. And the config does introduce some unnecessary confusion. Thank you
> for you comment. I will update the FLIP.
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <becket....@gmail.com>
> wrote:
> > > > > > Hi Xuannan,
> > > > > >
> > > > > > If user submits Job 1 and generated a cached intermediate
> result. And later
> > > > > > on, user submitted job 2 which should ideally use the
> intermediate result.
> > > > > > In that case, if job 2 failed due to missing the intermediate
> result, Job 2
> > > > > > should be retried with its full DAG. After that when Job 2 runs,
> it will
> > > > > > also re-generate the cache. However, once job 2 has fell back to
> the
> > > > > > original DAG, should it just be treated as an ordinary job that
> follow the
> > > > > > recovery strategy? Having a separate configuration seems a little
> > > > > > confusing. In another word, re-generating the cache is just a
> byproduct of
> > > > > > running the full DAG of job 2, but is not the main purpose. It
> is just like
> > > > > > when job 1 runs to generate cache, it does not have a separate
> config of
> > > > > > retry to make sure the cache is generated. If it fails, it just
> fail like
> > > > > > an ordinary job.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> suxuanna...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi Becket,
> > > > > > >
> > > > > > > The intermediate result will indeed be automatically
> re-generated by
> > > > > > > resubmitting the original DAG. And that job could fail as
> well. In that
> > > > > > > case, we need to decide if we should resubmit the original DAG
> to
> > > > > > > re-generate the intermediate result or give up and throw an
> exception to
> > > > > > > the user. And the config is to indicate how many resubmit
> should happen
> > > > > > > before giving up.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Xuannan
> > > > > > >
> > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> becket....@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi Xuannan,
> > > > > > > >
> > > > > > > >  I am not entirely sure if I understand the cases you
> mentioned. The
> > > > > > > users
> > > > > > > > > can use the cached table object returned by the .cache()
> method in
> > > > > > > other
> > > > > > > > > job and it should read the intermediate result. The
> intermediate result
> > > > > > > > can
> > > > > > > > > gone in the following three cases: 1. the user explicitly
> call the
> > > > > > > > > invalidateCache() method 2. the TableEnvironment is closed
> 3. failure
> > > > > > > > > happens on the TM. When that happens, the intermeidate
> result will not
> > > > > > > be
> > > > > > > > > available unless it is re-generated.
> > > > > > > >
> > > > > > > >
> > > > > > > > What confused me was that why do we need to have a
> *cache.retries.max
> > > > > > > > *config?
> > > > > > > > Shouldn't the missing intermediate result always be
> automatically
> > > > > > > > re-generated if it is gone?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> suxuanna...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Becket,
> > > > > > > > >
> > > > > > > > > Thanks for the comments.
> > > > > > > > >
> > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> becket....@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Xuannan,
> > > > > > > > > >
> > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> overall. Some
> > > > > > > quick
> > > > > > > > > > comments / questions below:
> > > > > > > > > >
> > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, the public interface of Table and TableEnvironment
> should be made
> > > > > > > in
> > > > > > > > > the Java API.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 2. What are the cases that users may want to retry
> reading the
> > > > > > > > > intermediate
> > > > > > > > > > result? It seems that once the intermediate result has
> gone, it will
> > > > > > > > not
> > > > > > > > > be
> > > > > > > > > > available later without being generated again, right?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >  I am not entirely sure if I understand the cases you
> mentioned. The
> > > > > > > > users
> > > > > > > > > can use the cached table object returned by the .cache()
> method in
> > > > > > > other
> > > > > > > > > job and it should read the intermediate result. The
> intermediate result
> > > > > > > > can
> > > > > > > > > gone in the following three cases: 1. the user explicitly
> call the
> > > > > > > > > invalidateCache() method 2. the TableEnvironment is closed
> 3. failure
> > > > > > > > > happens on the TM. When that happens, the intermeidate
> result will not
> > > > > > > be
> > > > > > > > > available unless it is re-generated.
> > > > > > > > >
> > > > > > > > > 3. In the "semantic of cache() method" section, the
> description "The
> > > > > > > > > > semantic of the *cache() *method is a little different
> depending on
> > > > > > > > > whether
> > > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > This line is actually outdated and should be removed, as
> we are not
> > > > > > > > adding
> > > > > > > > > the auto caching functionality in this FLIP. Auto caching
> will be added
> > > > > > > > in
> > > > > > > > > the future, and the semantic of cache() when auto caching
> is enabled
> > > > > > > will
> > > > > > > > > be discussed in detail by a new FLIP. I will remove the
> descriptor to
> > > > > > > > avoid
> > > > > > > > > further confusion.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> suxuanna...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi folks,
> > > > > > > > > > >
> > > > > > > > > > > I'd like to start the discussion about FLIP-36 Support
> Interactive
> > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > >
> > > > > > > > > > > The FLIP proposes to add support for interactive
> programming in
> > > > > > > Flink
> > > > > > > > > > Table
> > > > > > > > > > > API. Specifically, it let users cache the intermediate
> > > > > > > > results(tables)
> > > > > > > > > > and
> > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > >
> > > > > > > > > > > Even though the FLIP has been discussed in the
> past[1], the FLIP
> > > > > > > > hasn't
> > > > > > > > > > > formally passed the vote yet. And some of the design
> and
> > > > > > > > implementation
> > > > > > > > > > > detail have to change to incorporates the cluster
> partition
> > > > > > > proposed
> > > > > > > > in
> > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Xuannan
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > [2]
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
>

Reply via email to