Hi Xuannnan, Thanks for the reply.
Regarding whether and how to support cache sideoutput, I agree that the second option might be better if there do exist a use case that users need to cache only some certain side outputs. Xuannan Su <suxuanna...@gmail.com> 于2022年1月4日周二 15:50写道: > Hi Zhipeng and Gen, > > Thanks for joining the discussion. > > For Zhipeng: > > - Can we support side output > Caching the side output is indeed a valid use case. However, with the > current API, it is not straightforward to cache the side output. You > can apply an identity map function to the DataStream returned by the > getSideOutput method and then cache the result of the map > transformation. In my opinion, it is not user-friendly. Therefore, we > should think of a way to better support the use case. > As you say, we can introduce a new class > `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput` > method to return a `CachedDatastream`. With this approach, the cache > method implies that both output and the side output of the > `SingleOutputStreamOperatior` are cached. The problem with this > approach is that the user has no control over which side output should > be cached. > Another option would be to let the `getSideOuput` method return the > `SingleOutputStreamOperator`. This way, users can decide which side > output to cache. As the `getSideOutput` method returns a > `SingleOutputStreamOperator`. Users can set properties of the > transformation that produce the side output, e.g. parallelism, buffer > timeout, etc. If users try to set different values of the same > property of a transformation, an exception will be thrown. What do you > think? > > - Can we support Stream Mode > Running a job in stream mode doesn't guarantee the job will finish, > while in batch mode, it does. This is the main reason that prevents > us from supporting cache in stream mode. The cache cannot be used > unless the job can finish. > If I understand correctly, by "run batch jobs in Stream Mode", you > mean that you have a job with all bounded sources, but you want the > intermediate data to shuffle in pipelined mode instead of blocking > mode. If that is the case, the job can run in batch mode with > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1]. > And we can support caching in this case. > > - Change parallelism of CachedDataStream > CachedDataStream extends from DataStream, which doesn't have the > `setParallelism` method like the `SingleOutputStreamOperator`. Thus, > it should not be a problem with CachedDataStream. > > For Gen: > > - Relation between FLIP-205 and FLIP-188 > Although it feels like dynamic table and caching are similar in the > sense that they let user reuse come intermediate result, they target > different use cases. The dynamic table is targeting the use case where > users want to share a dynamic updating intermediate result across > multiple applications. It is some meaningful data that can be consumed > by different Flink applications and Flink jobs. While caching is > targeting the use case where users know that all the sources are > bounded and static, and caching is only used to avoid re-computing the > intermediate result. And the cached intermediate result is only > meaningful crossing jobs in the same application. > > Dynamic table and caching can be used together. For example, in a > machine learning scenario, we can have a Stream job that is generating > some training samples. And we can create a dynamic table for the > training sample. And we run a Flink application every hour to do some > data analysis on the training sample generated in the last hour. The > Flink application consists of multiple batch jobs and the batch jobs > share some intermediate results, so users can use cache to avoid > re-computation. The intermediate result is not meaningful outside of > the application. And the cache will be discarded after the application > is finished. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <luogen...@gmail.com> wrote: > > > > Hi Xuannan, > > > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table > > storage, which provides a unified changelog & table representation. > Tables > > stored there can be used in further ad-hoc queries. To my understanding, > > it's quite like an implementation of caching in Table API, and the ad-hoc > > queries are somehow like further steps in an interactive program. > > > > As you replied, caching at Table/SQL API is the next step, as a part of > > interactive programming in Table API, which we all agree is the major > > scenario. What do you think about the relation between it and FLIP-188? > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <suxuanna...@gmail.com> > wrote: > > > > > Hi David, > > > > > > Thanks for sharing your thoughts. > > > > > > You are right that most people tend to use high-level API for > > > interactive data exploration. Actually, there is > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I > > > know, it has been accepted but hasn’t been implemented. At the time > > > when it is drafted, DataStream did not support Batch mode but Table > > > API does. > > > > > > Now that the DataStream API does support batch processing, I think we > > > can focus on supporting cache at DataStream first. It is still > > > valuable for DataStream users and most of the work we do in this FLIP > > > can be reused. So I want to limit the scope of this FLIP. > > > > > > After caching is supported at DataStream, we can continue from where > > > FLIP-36 left off to support caching at Table/SQL API. We might have to > > > re-vote FLIP-36 or draft a new FLIP. What do you think? > > > > > > Best, > > > Xuannan > > > > > > [1] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > > > > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <d...@apache.org> wrote: > > > > > > > > Hi Xuannan, > > > > > > > > thanks for drafting this FLIP. > > > > > > > > One immediate thought, from what I've seen for interactive data > > > exploration > > > > with Spark, most people tend to use the higher level APIs, that > allow for > > > > faster prototyping (Table API in Flink's case). Should the Table API > also > > > > be covered by this FLIP? > > > > > > > > Best, > > > > D. > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <suxuanna...@gmail.com> > > > wrote: > > > > > > > > > Hi devs, > > > > > > > > > > I’d like to start a discussion about adding support to cache the > > > > > intermediate result at DataStream API for batch processing. > > > > > > > > > > As the DataStream API now supports batch execution mode, we see > users > > > > > using the DataStream API to run batch jobs. Interactive > programming is > > > > > an important use case of Flink batch processing. And the ability to > > > > > cache intermediate results of a DataStream is crucial to the > > > > > interactive programming experience. > > > > > > > > > > Therefore, we propose to support caching a DataStream in Batch > > > > > execution. We believe that users can benefit a lot from the change > and > > > > > encourage them to use DataStream API for their interactive batch > > > > > processing work. > > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to this > email > > > > > thread. Looking forward to your feedback! > > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing > > > > > > > > > > Best, > > > > > Xuannan > > > > > > > > > -- best, Zhipeng