Hi Xuannan, I think this already looks really good. The whole discussions is pretty long, so I'll just to summarize my current understanding of the outcome:
- This only aims on the DataStream API for now, but can be used as a building block for the higher level abstractions (Table API). - We're pushing caching down to the shuffle service (works with all implementations), storing the intermediate results. This should also naturally work with current fail-over mechanisms for batch (backtrack + recompute missing intermediate results [1]). > For setting the parallelism of the CacheTransformation. With the > current design, the parallelism of the cache intermediate result is > determined by the parallelism of the transformation that produces the > intermediate result to cache. Thus, the parallelism of the caching > transformation is set by the parallelism of the transformation to be > cached. I think the overhead should not be critical as the > cache-producing job suffers from the same overhead anyway. For > CacheTransformation with large parallelism but the result dataset is > relatively small, I think we should reduce the parallelism of the > transformation to cache. Is the whole "choosing the right parallelism for caching" problem solved by the Adaptive Batch Job Scheduler [2]? [1] https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler Best, D. On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <suxuanna...@gmail.com> wrote: > Hi Gen, > > Thanks for your feedback. > > I think you are talking about how we are going to store the caching > data. The first option is to write the data with a sink to an external > file system, much like the file store of the Dynamic Table. If I > understand correctly, it requires a distributed file system, e.g HDSF, > s3, etc. In my opinion, it is too heavyweight to use a distributed > file system for caching. > > As you said, using the shuffle service for caching is quite natural as > we need to produce the intermediate result anyway. For Table/SQL API, > the table operations are translated to transformations, where we can > reuse the CacheTransformation. It should not be unfriendly for > Table/SQL API. > > For setting the parallelism of the CacheTransformation. With the > current design, the parallelism of the cache intermediate result is > determined by the parallelism of the transformation that produces the > intermediate result to cache. Thus, the parallelism of the caching > transformation is set by the parallelism of the transformation to be > cached. I think the overhead should not be critical as the > cache-producing job suffers from the same overhead anyway. For > CacheTransformation with large parallelism but the result dataset is > relatively small, I think we should reduce the parallelism of the > transformation to cache. > > Best, > Xuannan > > > > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <luogen...@gmail.com> wrote: > > > > Hi Xuannan, > > > > Thanks for the reply. > > > > I do agree that dynamic tables and cached partitions are similar features > > aiming different cases. In my opinion, the main difference of the > > implementations is to cache only the data or the whole result partition. > > > > To cache only the data, we can translate the CacheTransformation to a > Sink > > node for writing, and Source node for consuming. Most of the things are > > just the same as this FLIP, except for the storage, which is an external > > one (or a built-in one if we can use the dynamic table storage), instead > of > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service. This > > can make caching independent from a specific shuffle service, and make it > > possible to share data between different jobs / Per-Job mode jobs. > > > > Caching the whole partition is natural in DataStream API, since the > > partition is a low-level concept, and data storage is already provided by > > the default shuffle service. So if we want to choose a solution only to > > support cache in DataStream API, caching the whole partition can be a > good > > choice. But this may be not as friendly to Table/SQL API as to > > DataStream, since users are announcing to cache a logical Table (view), > > rather than a physical partition. If we want a unified solution for both > > APIs, this may need to be considered. > > > > > > And here's another suggestion to this FLIP. Maybe we should support > > "setParallelism" in CacheTransformation, for both caching and consuming. > > > > In some cases, the input parallelism of the CacheTransformation is large > > but the result dataset is relatively small. We may need too many > resources > > to consume the result partition if the source parallelism has to be the > > same with the producer. > > > > On the other hand, serving a large number of partitions may also have > more > > overhead. Though maybe it's not a big burban, we can try to reduce the > > actual cached partition count if necessary, for example by adding a > > pass-through vertex with the specific parallelism between the producer > and > > the cache vertices. > > > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zhangzhipe...@gmail.com> > > wrote: > > > > > Hi Xuannnan, > > > > > > Thanks for the reply. > > > > > > Regarding whether and how to support cache sideoutput, I agree that the > > > second option might be better if there do exist a use case that users > need > > > to cache only some certain side outputs. > > > > > > > > > Xuannan Su <suxuanna...@gmail.com> 于2022年1月4日周二 15:50写道: > > > > > > > Hi Zhipeng and Gen, > > > > > > > > Thanks for joining the discussion. > > > > > > > > For Zhipeng: > > > > > > > > - Can we support side output > > > > Caching the side output is indeed a valid use case. However, with the > > > > current API, it is not straightforward to cache the side output. You > > > > can apply an identity map function to the DataStream returned by the > > > > getSideOutput method and then cache the result of the map > > > > transformation. In my opinion, it is not user-friendly. Therefore, we > > > > should think of a way to better support the use case. > > > > As you say, we can introduce a new class > > > > `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput` > > > > method to return a `CachedDatastream`. With this approach, the cache > > > > method implies that both output and the side output of the > > > > `SingleOutputStreamOperatior` are cached. The problem with this > > > > approach is that the user has no control over which side output > should > > > > be cached. > > > > Another option would be to let the `getSideOuput` method return the > > > > `SingleOutputStreamOperator`. This way, users can decide which side > > > > output to cache. As the `getSideOutput` method returns a > > > > `SingleOutputStreamOperator`. Users can set properties of the > > > > transformation that produce the side output, e.g. parallelism, buffer > > > > timeout, etc. If users try to set different values of the same > > > > property of a transformation, an exception will be thrown. What do > you > > > > think? > > > > > > > > - Can we support Stream Mode > > > > Running a job in stream mode doesn't guarantee the job will finish, > > > > while in batch mode, it does. This is the main reason that prevents > > > > us from supporting cache in stream mode. The cache cannot be used > > > > unless the job can finish. > > > > If I understand correctly, by "run batch jobs in Stream Mode", you > > > > mean that you have a job with all bounded sources, but you want the > > > > intermediate data to shuffle in pipelined mode instead of blocking > > > > mode. If that is the case, the job can run in batch mode with > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1]. > > > > And we can support caching in this case. > > > > > > > > - Change parallelism of CachedDataStream > > > > CachedDataStream extends from DataStream, which doesn't have the > > > > `setParallelism` method like the `SingleOutputStreamOperator`. Thus, > > > > it should not be a problem with CachedDataStream. > > > > > > > > For Gen: > > > > > > > > - Relation between FLIP-205 and FLIP-188 > > > > Although it feels like dynamic table and caching are similar in the > > > > sense that they let user reuse come intermediate result, they target > > > > different use cases. The dynamic table is targeting the use case > where > > > > users want to share a dynamic updating intermediate result across > > > > multiple applications. It is some meaningful data that can be > consumed > > > > by different Flink applications and Flink jobs. While caching is > > > > targeting the use case where users know that all the sources are > > > > bounded and static, and caching is only used to avoid re-computing > the > > > > intermediate result. And the cached intermediate result is only > > > > meaningful crossing jobs in the same application. > > > > > > > > Dynamic table and caching can be used together. For example, in a > > > > machine learning scenario, we can have a Stream job that is > generating > > > > some training samples. And we can create a dynamic table for the > > > > training sample. And we run a Flink application every hour to do some > > > > data analysis on the training sample generated in the last hour. The > > > > Flink application consists of multiple batch jobs and the batch jobs > > > > share some intermediate results, so users can use cache to avoid > > > > re-computation. The intermediate result is not meaningful outside of > > > > the application. And the cache will be discarded after the > application > > > > is finished. > > > > > > > > [1] > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode > > > > > > > > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <luogen...@gmail.com> wrote: > > > > > > > > > > Hi Xuannan, > > > > > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic > > > table > > > > > storage, which provides a unified changelog & table representation. > > > > Tables > > > > > stored there can be used in further ad-hoc queries. To my > > > understanding, > > > > > it's quite like an implementation of caching in Table API, and the > > > ad-hoc > > > > > queries are somehow like further steps in an interactive program. > > > > > > > > > > As you replied, caching at Table/SQL API is the next step, as a > part of > > > > > interactive programming in Table API, which we all agree is the > major > > > > > scenario. What do you think about the relation between it and > FLIP-188? > > > > > > > > > > [1] > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > > > > > > > > > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <suxuanna...@gmail.com> > > > > wrote: > > > > > > > > > > > Hi David, > > > > > > > > > > > > Thanks for sharing your thoughts. > > > > > > > > > > > > You are right that most people tend to use high-level API for > > > > > > interactive data exploration. Actually, there is > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far > as I > > > > > > know, it has been accepted but hasn’t been implemented. At the > time > > > > > > when it is drafted, DataStream did not support Batch mode but > Table > > > > > > API does. > > > > > > > > > > > > Now that the DataStream API does support batch processing, I > think we > > > > > > can focus on supporting cache at DataStream first. It is still > > > > > > valuable for DataStream users and most of the work we do in this > FLIP > > > > > > can be reused. So I want to limit the scope of this FLIP. > > > > > > > > > > > > After caching is supported at DataStream, we can continue from > where > > > > > > FLIP-36 left off to support caching at Table/SQL API. We might > have > > > to > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think? > > > > > > > > > > > > Best, > > > > > > Xuannan > > > > > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <d...@apache.org> > > > wrote: > > > > > > > > > > > > > > Hi Xuannan, > > > > > > > > > > > > > > thanks for drafting this FLIP. > > > > > > > > > > > > > > One immediate thought, from what I've seen for interactive data > > > > > > exploration > > > > > > > with Spark, most people tend to use the higher level APIs, that > > > > allow for > > > > > > > faster prototyping (Table API in Flink's case). Should the > Table > > > API > > > > also > > > > > > > be covered by this FLIP? > > > > > > > > > > > > > > Best, > > > > > > > D. > > > > > > > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su < > suxuanna...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > > > > > I’d like to start a discussion about adding support to cache > the > > > > > > > > intermediate result at DataStream API for batch processing. > > > > > > > > > > > > > > > > As the DataStream API now supports batch execution mode, we > see > > > > users > > > > > > > > using the DataStream API to run batch jobs. Interactive > > > > programming is > > > > > > > > an important use case of Flink batch processing. And the > ability > > > to > > > > > > > > cache intermediate results of a DataStream is crucial to the > > > > > > > > interactive programming experience. > > > > > > > > > > > > > > > > Therefore, we propose to support caching a DataStream in > Batch > > > > > > > > execution. We believe that users can benefit a lot from the > > > change > > > > and > > > > > > > > encourage them to use DataStream API for their interactive > batch > > > > > > > > processing work. > > > > > > > > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to > this > > > > email > > > > > > > > thread. Looking forward to your feedback! > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing > > > > > > > > > > > > > > > > Best, > > > > > > > > Xuannan > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > best, > > > Zhipeng > > > >