Hi Gen, Thanks for your feedback.
I think you are talking about how we are going to store the caching data. The first option is to write the data with a sink to an external file system, much like the file store of the Dynamic Table. If I understand correctly, it requires a distributed file system, e.g HDSF, s3, etc. In my opinion, it is too heavyweight to use a distributed file system for caching. As you said, using the shuffle service for caching is quite natural as we need to produce the intermediate result anyway. For Table/SQL API, the table operations are translated to transformations, where we can reuse the CacheTransformation. It should not be unfriendly for Table/SQL API. For setting the parallelism of the CacheTransformation. With the current design, the parallelism of the cache intermediate result is determined by the parallelism of the transformation that produces the intermediate result to cache. Thus, the parallelism of the caching transformation is set by the parallelism of the transformation to be cached. I think the overhead should not be critical as the cache-producing job suffers from the same overhead anyway. For CacheTransformation with large parallelism but the result dataset is relatively small, I think we should reduce the parallelism of the transformation to cache. Best, Xuannan On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <luogen...@gmail.com> wrote: > > Hi Xuannan, > > Thanks for the reply. > > I do agree that dynamic tables and cached partitions are similar features > aiming different cases. In my opinion, the main difference of the > implementations is to cache only the data or the whole result partition. > > To cache only the data, we can translate the CacheTransformation to a Sink > node for writing, and Source node for consuming. Most of the things are > just the same as this FLIP, except for the storage, which is an external > one (or a built-in one if we can use the dynamic table storage), instead of > the BLOCKING_PERSISTED type ResultPartition in the shuffle service. This > can make caching independent from a specific shuffle service, and make it > possible to share data between different jobs / Per-Job mode jobs. > > Caching the whole partition is natural in DataStream API, since the > partition is a low-level concept, and data storage is already provided by > the default shuffle service. So if we want to choose a solution only to > support cache in DataStream API, caching the whole partition can be a good > choice. But this may be not as friendly to Table/SQL API as to > DataStream, since users are announcing to cache a logical Table (view), > rather than a physical partition. If we want a unified solution for both > APIs, this may need to be considered. > > > And here's another suggestion to this FLIP. Maybe we should support > "setParallelism" in CacheTransformation, for both caching and consuming. > > In some cases, the input parallelism of the CacheTransformation is large > but the result dataset is relatively small. We may need too many resources > to consume the result partition if the source parallelism has to be the > same with the producer. > > On the other hand, serving a large number of partitions may also have more > overhead. Though maybe it's not a big burban, we can try to reduce the > actual cached partition count if necessary, for example by adding a > pass-through vertex with the specific parallelism between the producer and > the cache vertices. > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zhangzhipe...@gmail.com> > wrote: > > > Hi Xuannnan, > > > > Thanks for the reply. > > > > Regarding whether and how to support cache sideoutput, I agree that the > > second option might be better if there do exist a use case that users need > > to cache only some certain side outputs. > > > > > > Xuannan Su <suxuanna...@gmail.com> 于2022年1月4日周二 15:50写道: > > > > > Hi Zhipeng and Gen, > > > > > > Thanks for joining the discussion. > > > > > > For Zhipeng: > > > > > > - Can we support side output > > > Caching the side output is indeed a valid use case. However, with the > > > current API, it is not straightforward to cache the side output. You > > > can apply an identity map function to the DataStream returned by the > > > getSideOutput method and then cache the result of the map > > > transformation. In my opinion, it is not user-friendly. Therefore, we > > > should think of a way to better support the use case. > > > As you say, we can introduce a new class > > > `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput` > > > method to return a `CachedDatastream`. With this approach, the cache > > > method implies that both output and the side output of the > > > `SingleOutputStreamOperatior` are cached. The problem with this > > > approach is that the user has no control over which side output should > > > be cached. > > > Another option would be to let the `getSideOuput` method return the > > > `SingleOutputStreamOperator`. This way, users can decide which side > > > output to cache. As the `getSideOutput` method returns a > > > `SingleOutputStreamOperator`. Users can set properties of the > > > transformation that produce the side output, e.g. parallelism, buffer > > > timeout, etc. If users try to set different values of the same > > > property of a transformation, an exception will be thrown. What do you > > > think? > > > > > > - Can we support Stream Mode > > > Running a job in stream mode doesn't guarantee the job will finish, > > > while in batch mode, it does. This is the main reason that prevents > > > us from supporting cache in stream mode. The cache cannot be used > > > unless the job can finish. > > > If I understand correctly, by "run batch jobs in Stream Mode", you > > > mean that you have a job with all bounded sources, but you want the > > > intermediate data to shuffle in pipelined mode instead of blocking > > > mode. If that is the case, the job can run in batch mode with > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1]. > > > And we can support caching in this case. > > > > > > - Change parallelism of CachedDataStream > > > CachedDataStream extends from DataStream, which doesn't have the > > > `setParallelism` method like the `SingleOutputStreamOperator`. Thus, > > > it should not be a problem with CachedDataStream. > > > > > > For Gen: > > > > > > - Relation between FLIP-205 and FLIP-188 > > > Although it feels like dynamic table and caching are similar in the > > > sense that they let user reuse come intermediate result, they target > > > different use cases. The dynamic table is targeting the use case where > > > users want to share a dynamic updating intermediate result across > > > multiple applications. It is some meaningful data that can be consumed > > > by different Flink applications and Flink jobs. While caching is > > > targeting the use case where users know that all the sources are > > > bounded and static, and caching is only used to avoid re-computing the > > > intermediate result. And the cached intermediate result is only > > > meaningful crossing jobs in the same application. > > > > > > Dynamic table and caching can be used together. For example, in a > > > machine learning scenario, we can have a Stream job that is generating > > > some training samples. And we can create a dynamic table for the > > > training sample. And we run a Flink application every hour to do some > > > data analysis on the training sample generated in the last hour. The > > > Flink application consists of multiple batch jobs and the batch jobs > > > share some intermediate results, so users can use cache to avoid > > > re-computation. The intermediate result is not meaningful outside of > > > the application. And the cache will be discarded after the application > > > is finished. > > > > > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode > > > > > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <luogen...@gmail.com> wrote: > > > > > > > > Hi Xuannan, > > > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic > > table > > > > storage, which provides a unified changelog & table representation. > > > Tables > > > > stored there can be used in further ad-hoc queries. To my > > understanding, > > > > it's quite like an implementation of caching in Table API, and the > > ad-hoc > > > > queries are somehow like further steps in an interactive program. > > > > > > > > As you replied, caching at Table/SQL API is the next step, as a part of > > > > interactive programming in Table API, which we all agree is the major > > > > scenario. What do you think about the relation between it and FLIP-188? > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > > > > > > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <suxuanna...@gmail.com> > > > wrote: > > > > > > > > > Hi David, > > > > > > > > > > Thanks for sharing your thoughts. > > > > > > > > > > You are right that most people tend to use high-level API for > > > > > interactive data exploration. Actually, there is > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I > > > > > know, it has been accepted but hasn’t been implemented. At the time > > > > > when it is drafted, DataStream did not support Batch mode but Table > > > > > API does. > > > > > > > > > > Now that the DataStream API does support batch processing, I think we > > > > > can focus on supporting cache at DataStream first. It is still > > > > > valuable for DataStream users and most of the work we do in this FLIP > > > > > can be reused. So I want to limit the scope of this FLIP. > > > > > > > > > > After caching is supported at DataStream, we can continue from where > > > > > FLIP-36 left off to support caching at Table/SQL API. We might have > > to > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think? > > > > > > > > > > Best, > > > > > Xuannan > > > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > > > > > > > > > > > > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <d...@apache.org> > > wrote: > > > > > > > > > > > > Hi Xuannan, > > > > > > > > > > > > thanks for drafting this FLIP. > > > > > > > > > > > > One immediate thought, from what I've seen for interactive data > > > > > exploration > > > > > > with Spark, most people tend to use the higher level APIs, that > > > allow for > > > > > > faster prototyping (Table API in Flink's case). Should the Table > > API > > > also > > > > > > be covered by this FLIP? > > > > > > > > > > > > Best, > > > > > > D. > > > > > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <suxuanna...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > I’d like to start a discussion about adding support to cache the > > > > > > > intermediate result at DataStream API for batch processing. > > > > > > > > > > > > > > As the DataStream API now supports batch execution mode, we see > > > users > > > > > > > using the DataStream API to run batch jobs. Interactive > > > programming is > > > > > > > an important use case of Flink batch processing. And the ability > > to > > > > > > > cache intermediate results of a DataStream is crucial to the > > > > > > > interactive programming experience. > > > > > > > > > > > > > > Therefore, we propose to support caching a DataStream in Batch > > > > > > > execution. We believe that users can benefit a lot from the > > change > > > and > > > > > > > encourage them to use DataStream API for their interactive batch > > > > > > > processing work. > > > > > > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to this > > > email > > > > > > > thread. Looking forward to your feedback! > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing > > > > > > > > > > > > > > Best, > > > > > > > Xuannan > > > > > > > > > > > > > > > > > > > > > -- > > best, > > Zhipeng > >