Hey Becket,
+1 From my side
Piotrek
> On 14 Jan 2019, at 14:43, Becket Qin wrote:
>
> Hi Seth,
>
> Thanks for the feedback. Re-caching makes sense to me. Piotr and I had some
> offline discussion and we generally reached consensus on the following API:
>
> {
> /**
>* Cache this table to
Hi Seth,
Thanks for the feedback. Re-caching makes sense to me. Piotr and I had some
offline discussion and we generally reached consensus on the following API:
{
/**
* Cache this table to builtin table service or the specified customized
table service.
*
* This method provides a hi
I spoke to Piotr a little bit offline and I wanted to comment with a summary of
our discussion and what I believe is most intuitive cache model from a users
perspective.
(I am making up some class names here, not looking to bike shed feel free to
change the names how ever you see fit).
A cac
Hi Piotr,
1. `env.getCacheService().releaseCacheFor(cachedT);` vs
`cachedT.releaseCache();`
It doesn't matter which signature we provide. To those who write the
function, "releasing the cache" is not a "side effect", it is exactly what
they wanted. Even if they know that they may be releasing some
Hi,
I know that it still can have side effects and that’s why I wrote:
> Something like this might be a better (not perfect, but just a bit better):
My point was that this:
void foo(Table t) {
val cachedT = t.cache();
...
env.getCacheService().releaseCacheFor(cachedT);
}
Should communicate
Just to clarify, when I say foo() like below, I assume that foo() must have
a way to release its own cache, so it must have access to table env.
void foo(Table t) {
...
t.cache(); // create cache for t
...
env.getCacheService().releaseCacheFor(t); // release cache for t
}
Thanks,
Jiangji
Hi Piotr,
I don't think it is feasible to ask every third party library to have
method signature with CacheService as an argument.
And even that signature does not really solve the problem. Imagine function
foo() looks like following:
void foo(Table t) {
...
t.cache(); // create cache for t
Hi,
I think that introducing ref counting could be confusing and it will be error
prone, since Flink-table’s users are not used to closing/releasing resources. I
was more objecting placing the `uncache()`/`dropCache()`/`releaseCache()`
(releaseCache sounds best to me) as a method in the “Table”
Hi Piotr,
You are right. There might be two intuitive meanings when users call
'a.uncache()', namely:
1. release the resource
2. Do not use cache for the next operation.
Case (1) would likely be the dominant use case. So I would suggest we
dedicate uncache() method to case (1), i.e. for resource
Hi Becket,
With `uncache` there are probably two features that we can think about:
a)
Physically dropping the cached table from the storage, freeing up the resources
b)
Hinting the optimizer to not cache the reads for the next query/table
a) Has the issue as I wrote before, that it seemed to
Hi Piotr,
Thanks for the proposal and detailed explanation. I like the idea of
returning a new hinted Table without modifying the original table. This
also leave the room for users to benefit from future implicit caching.
Just to make sure I get the full picture. In your proposal, there will also
Hi Becket!
After further thinking I tend to agree that my previous proposal (*Option 2*)
indeed might not be if would in the future introduce automatic caching. However
I would like to propose a slightly modified version of it:
*Option 4*
Adding `cache()` method with following signature:
Tabl
Happy New Year, everybody!
I would like to resume this discussion thread. At this point, We have
agreed on the first step goal of interactive programming. The open
discussion is the exact API. More specifically, what should *cache()*
method return and what is the semantic. There are three options:
Hi Piotrek,
1. Regarding optimization.
Sure there are many cases that the decision is hard to make. But that does
not make it any easier for the users to make those decisions. I imagine 99%
of the users would just naively use cache. I am not saying we can optimize
in all the cases. But as long as
Hi,
Thanks for the quick answer :)
Re 1.
I generally agree with you, however couple of points:
a) the problem with using automatic caching is bigger, because you will have to
decide, how do you compare IO vs CPU costs and if you pick wrong, additional IO
costs might be enormous or even can cr
Hi Piotrek,
Not sure if you noticed, in my last email, I was proposing `CacheHandle
cache()` to avoid the potential side effect due to function calls.
Let's look at the disagreement in your reply one by one.
1. Optimization chances
Optimization is never a trivial work. This is exactly why we s
Hi Becket,
> Regarding the chance of optimization, it might not be that rare. Some very
> simple statistics could already help in many cases. For example, simply
> maintaining max and min of each fields can already eliminate some
> unnecessary table scan (potentially scanning the cached table) if
Hi Becket,
Introducing CacheHandle seems too complicated. That means users have to
maintain Handler properly.
And since cache is just a hint for optimizer, why not just return Table
itself for cache method. This hint info should be kept in Table I believe.
So how about adding method cache and un
Hi Till and Piotrek,
Thanks for the clarification. That solves quite a few confusion. My
understanding of how cache works is same as what Till describe. i.e.
cache() is a hint to Flink, but it is not guaranteed that cache always
exist and it might be recomputed from its lineage.
Is this the core
Hi Becket,
I was aiming at semantics similar to 1. I actually thought that `cache()`
would tell the system to materialize the intermediate result so that
subsequent queries don't need to reprocess it. This means that the usage of
the cached table in this example
{
val cachedTable = a.cache()
va
Hi Becket,
> {
> val cachedTable = a.cache()
> val b = cachedTable.select(...)
> val c = a.select(...)
> }
>
> Semantic 1. b uses cachedTable as user demanded so. c uses original DAG as
> user demanded so. In this case, the optimizer has no chance to optimize.
> Semantic 2. b uses cachedTable
Another potential concern for semantic 3 is that. In the future, we may add
automatic caching to Flink. e.g. cache the intermediate results at the
shuffle boundary. If our semantic is that reference to the original table
means skipping cache, those users may not be able to benefit from the
implicit
Hi Piotrek,
Thanks for the reply. Thought about it again, I might have misunderstood
your proposal in earlier emails. Returning a CachedTable might not be a bad
idea.
I was more concerned about the semantic and its intuitiveness when a
CachedTable is returned. i..e, if cache() returns CachedTable
Hi Becket,
Sorry for not responding long time.
Regarding case1.
There wouldn’t be no “a.unCache()” method, but I would expect only
`cachedTableA1.dropCache()`. Dropping `cachedTableA1` wouldn’t affect
`cachedTableA2`. Just as in any other database dropping modifying one
independent table/mate
Hi Till,
It is true that after the first job submission, there will be no ambiguity
in terms of whether a cached table is used or not. That is the same for the
cache() without returning a CachedTable.
Conceptually one could think of cache() as introducing a caching operator
> from which you need
Hi,
All the recent discussions are focused on whether there is a problem if
cache() not return a Table.
It seems that returning a Table explicitly is more clear (and safe?).
So whether there are any problems if cache() returns a Table? @Becket
Best,
Jark
On Tue, 4 Dec 2018 at 22:27, Till Rohrm
It's true that b, c, d and e will all read from the original DAG that
generates a. But all subsequent operators (when running multiple queries)
which reference cachedTableA should not need to reproduce `a` but directly
consume the intermediate result.
Conceptually one could think of cache() as int
Hi Till,
Thanks for the clarification. I am still a little confused.
If cache() returns a CachedTable, the example might become:
b = a.map(...)
c = a.map(...)
cachedTableA = a.cache()
d = cachedTableA.map(...)
e = a.map()
In the above case, if cache() is lazily evaluated, b, c, d and e are all
Yes you are right Becket that it still depends on the actual execution of
the job whether a consumer reads from a cached result or not.
My point was actually about the properties of a (cached vs. non-cached) and
not about the execution. I would not make cache trigger the execution of
the job becau
Hi Till,
That is a good example. Just a minor correction, in this case, b, c and d
will all consume from a non-cached a. This is because cache will only be
created on the very first job submission that generates the table to be
cached.
If I understand correctly, this is example is about whether .
Another argument for Piotr's point is that lazily changing properties of a
node affects all down stream consumers but does not necessarily have to
happen before these consumers are defined. From a user's perspective this
can be quite confusing:
b = a.map(...)
c = a.map(...)
a.cache()
d = a.map(..
Hey Shaoxuan and Becket,
> Can you explain a bit more one what are the side effects? So far my
> understanding is that such side effects only exist if a table is mutable.
> Is that the case?
Not only that. There are also performance implications and those are another
implicit side effects of usi
Hi Piotrek,
Thanks again for the clarification. Some more replies are following.
But keep in mind that `.cache()` will/might not only be used in interactive
> programming and not only in batching.
It is true. Actually in stream processing, cache() has the same semantic as
batch processing. The s
Hi Piotrek,
Cache() should not affect semantics and business logic, and thus it will
not lead to random behavior/results. The underlying design should ensure
this. I thought your example as a valid anti-case. But Jiangjie is correct,
the source table in batching should be immutable. It is the user
Hi all,
Regarding naming `cache()` vs `materialize()`. One more explanation why I think
`materialize()` is more natural to me is that I think of all “Table”s in
Table-API as views. They behave the same way as SQL views, the only difference
for me is that their live scope is short - current sess
Hi all,
I agree with @Becket that `cache()` and `materialize()` should be considered as
two different methods where the later one is more sophisticated.
According to my understanding, the initial idea is just to introduce a simple
cache or persist mechanism, but as the TableAPI is a high-level
Hi Fabian,
Thanks for sharing the feedback!
Re: 1)
Good question about the implementation. In fact, Alibaba has modified the
query planning a little bit to add something called LogicalNodeBlock.
Basically, a given DAG could be divided into a few LogicalNodeBlocks, and
the optimization will be don
Hi Piotrek and Jark,
Thanks for the feedback and explanation. Those are good arguments. But I
think those arguments are mostly about materialized view. Let me try to
explain the reason I believe cache() and materialize() are different.
I think cache() and materialize() have quite different implic
Thanks Piotrek,
You provided a very good example, it explains all the confusions I have.
It is clear that there is something we have not considered in the initial
proposal. We intend to force the user to reuse the cached/materialized
table, if its cache() method is executed. We did not expect that
Hi Shaoxuan,
Re 2:
> Table t3 = methodThatAppliesOperators(t1) // t1 is modified to-> t1’
What do you mean that “ t1 is modified to-> t1’ ” ? That
`methodThatAppliesOperators()` method has changed it’s plan?
I was thinking more about something like this:
Table source = … // some source that
Hi,
It is an very interesting and useful design!
Here I want to share some of my thoughts:
1. Agree with that cache() method should return some Table to avoid some
unexpected problems because of the mutable object.
All the existing methods of Table are returning a new Table instance.
2. I th
Hi Fabian and Piotr,
Thanks for the feedback. I think I now understand you a little better.
1. “Materialize" and “cache" are two different scenarios IMO. "Materialize"
is a complex feature that allows the user to really create a
materializedView/table, and the materialized table will be timely up
Hi,
Thanks for the clarification Becket!
I have a few thoughts to share / questions:
1) I'd like to know how you plan to implement the feature on a plan /
planner level.
I would imaging the following to happen when Table.cache() is called:
1) immediately optimize the Table and internally conve
Hi Piotr,
Thanks for sharing your ideas on the method naming. We will think about
your suggestions. But I don't understand why we need to change the return
type of cache().
Cache() is a physical operation, it does not change the logic of
the `Table`. On the tableAPI layer, we should not introduce
Hi Becket,
Thanks for the response.
1. I wasn’t saying that materialised view must be mutable or not. The same
thing applies to caches as well. To the contrary, I would expect more
consistency and updates from something that is called “cache” vs something
that’s a “materialised view”. In other
Just to add a little bit, the materialized view is probably more similar to
the persistent() brought up earlier in the thread. So it is usually cross
session and could be used in a larger scope. For example, a materialized
view created by user A may be visible to user B. It is probably something
we
Hi Piotrek,
Thanks for the explanation.
Right now we are mostly thinking of the cached table as immutable. I can
see the Materialized view would be useful in the future. That said, I think
a simple cache mechanism is probably still needed. So to me, cache() and
materialize() should be two separat
Thanks for the feedback, Fabian.
As you mentioned, cache() method itself does not imply any implementation
detail. In fact, we plan to implement a default table service which is
locality aware, so the default table service hopefully will be satisfactory
in most cases. We could also explore more me
Hi Becket,
> Is there any extra thing user can do on a MaterializedTable that they cannot
> do on a Table?
Maybe not in the initial implementation, but various DBs offer different ways
to “refresh” the materialised view. Hooks, triggers, timers, manually etc.
Having `MaterializedTable` would
I'm not suggesting to add support for Ignite. This was just an example.
Plasma and Arrow sound interesting, too.
For the sake of this proposal, it would be up to the user to implement a
TableFactory and corresponding TableSource / TableSink classes to persist
and read the data.
Am Mo., 26. Nov. 20
What about to add also Apache Plasma + Arrow as an alternative to Apache
Ignite?
[1] https://arrow.apache.org/blog/2017/08/08/plasma-in-memory-object-store/
On Mon, Nov 26, 2018 at 11:56 AM Fabian Hueske wrote:
> Hi,
>
> Thanks for the proposal!
>
> To summarize, you propose a new method Table.c
Hi,
Thanks for the proposal!
To summarize, you propose a new method Table.cache(): Table that will
trigger a job and write the result into some temporary storage as defined
by a TableFactory.
The cache() call blocks while the job is running and eventually returns a
Table object that represents a
Thanks for the explanation, Piotrek.
Is there any extra thing user can do on a MaterializedTable that they
cannot do on a Table? After users call *table.cache(), *users can just use
that table and do anything that is supported on a Table, including SQL.
Naming wise, either cache() or materialize(
Hi Becket,
Ops, sorry I didn’t notice that you intend to reuse existing `TableFactory`. I
don’t know why, but I assumed that you want to provide an alternate way of
writing the data.
Now that I hopefully understand the proposal, maybe we could rename `cache()`
to
void materialize()
or going
Hi Piotrek,
For the cache() method itself, yes, it is equivalent to creating a BUILT-IN
materialized view with a lifecycle. That functionality is missing today,
though. Not sure if I understand your question. Do you mean we already have
the functionality and just need a syntax sugar?
What's more
Hi,
Interesting idea. I’m trying to understand the problem. Isn’t the `cache()`
call an equivalent of writing data to a sink and later reading from it? Where
this sink has a limited live scope/live time? And the sink could be implemented
as in memory or a file sink?
If so, what’s the problem w
Thanks for the suggestion, Jincheng.
Yes, I think it makes sense to have a persist() with lifecycle/defined
scope. I just added a section in the future work for this.
Thanks,
Jiangjie (Becket) Qin
On Fri, Nov 23, 2018 at 1:55 PM jincheng sun
wrote:
> Hi Jiangjie,
>
> Thank you for the explan
Hi Xiaowei,
Thanks for the comment. That is a valid point.
The callback is not only associated with a particular temp table. It is a
clean up logic provided by the user. The temp table to session ID mapping
is tracked internally. We also need to associate the callback with the
session lifecycle a
Hi Jiangjie,
Thank you for the explanation about the name of `cache()`, I understand why
you designed this way!
Another idea is whether we can specify a lifecycle for data persistence?
For example, persist (LifeCycle.SESSION), so that the user is not worried
about data loss, and will clearly spec
Re: Jincheng,
Thanks for the feedback. Regarding cache() v.s. persist(), personally I
find cache() to be more accurately describing the behavior, i.e. the Table
is cached for the session, but will be deleted after the session is closed.
persist() seems a little misleading as people might think the
Hi all,
@Shaoxuan, I think the lifecycle or access domain are both orthogonal to the
cache problem. Essentially, this may be the first time we plan to introduce
another storage mechanism other than the state. Maybe it’s better to first draw
a big picture and then concentrate on a specific part?
Relying on a callback for the temp table for clean up is not very reliable.
There is no guarantee that it will be executed successfully. We may risk
leaks when that happens. I think that it's safer to have an association
between temp table and session id. So we can always clean up temp tables
which
Hi Jiangjie&Shaoxuan,
Thanks for initiating this great proposal!
Interactive Programming is very useful and user friendly in case of your
examples.
Moreover, especially when a business has to be executed in several stages
with dependencies,such as the pipeline of Flink ML, in order to utilize the
Hi Xingcan,
These a great points. We are on the same page regarding potential
capabilities of the proposed changes. There are actually two main parts in
the proposal, the API and the underlying service. Both parts can be
extended in the future.
We made a few design choices when draft the doc to r
Hi Xingcan,
I think you probably misunderstood our proposal. The proposed “cache()” API
basically infers the data is only available for its session, but not
forever available for other sessions to access. It will be cleaned when the
session exits. “cache” does not imply the underlying implementati
Hi all,
Thanks for the replies.
@Becket I think whether putting the persist/cache methods in a separated util
class or inside the DataSet/Table depends on what we want to introduce. The
former one sounds more like a data storage component where users may even
somehow get a stored DataSet/Table
Hi Becket,
I think the Flink Service is a good abstraction, with which we can easily
build Interactive Programing or some other features.
We might bring the concept of 'Session', then we can think of Flink
Services as system processes and user jobs as user processes, so the
management of life cycl
Hi Weihua,
Thanks for the comments. These are great questions!
To answer question 1, I think it depends on what do we want from the cache
service. At this point, it is not quite clear to me whether Flink needs
different caching levels. For example, in Spark, the memory level caching
are mostly us
Hi Xingcan,
Thanks for the comments. Yes, "cache/persistent the intermediate data" is
useful. It can bring benefit to many scenarios. But different scenarios may
have different ways to solve it. For instance, as I replied to
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Em
Hi Xingcan,
Thanks for the feedback.
Adding the cache to DataSet is useful. In fact, the current proposal does
not assume the "PersistService" can only be used by the Table. We can
always add DataSet.cache() and let it benefit from the underlying
persistency support. So it seems more of a wording
Hi Becket,
The design is quite interesting and useful.
I have several questions about your design:
1. Shall we add some persistence level hint to cache() function for
different temperature data? E.g. IN_MEM, IN_DISK, etc, or HOTTEST, HOT,
WARM, COLD?
2. When will the corresponding cached data be
Hi Becket,
Thanks for bringing this up! For a long time, the intermediate cache problem
has always been a pain point of the Flink streaming model. As far as I know,
it’s quite a block for iterate operations in batch-related libs such as Gelly
and FlinkML.
Actually, there’s an old JIRA[1], aim
Hi all,
As a few recent email threads have pointed out, it is a promising
opportunity to enhance Flink Table API in various aspects, including
functionality and ease of use among others. One of the scenarios where we
feel Flink could improve is interactive programming. To explain the issues
and fa
73 matches
Mail list logo