Hi Xuannnan,

Thanks for the reply.

Regarding whether and how to support cache sideoutput, I agree that the
second option might be better if there do exist a use case that users need
to cache only some certain side outputs.


Xuannan Su <suxuanna...@gmail.com> 于2022年1月4日周二 15:50写道:

> Hi Zhipeng and Gen,
>
> Thanks for joining the discussion.
>
> For Zhipeng:
>
> - Can we support side output
> Caching the side output is indeed a valid use case. However, with the
> current API, it is not straightforward to cache the side output. You
> can apply an identity map function to the DataStream returned by the
> getSideOutput method and then cache the result of the map
> transformation. In my opinion, it is not user-friendly. Therefore, we
> should think of a way to better support the use case.
> As you say, we can introduce a new class
> `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput`
> method to return a `CachedDatastream`. With this approach, the cache
> method implies that both output and the side output of the
> `SingleOutputStreamOperatior` are cached. The problem with this
> approach is that the user has no control over which side output should
> be cached.
> Another option would be to let the `getSideOuput` method return the
> `SingleOutputStreamOperator`. This way, users can decide which side
> output to cache. As the `getSideOutput` method returns a
> `SingleOutputStreamOperator`. Users can set properties of the
> transformation that produce the side output, e.g. parallelism, buffer
> timeout, etc. If users try to set different values of the same
> property of a transformation, an exception will be thrown. What do you
> think?
>
> - Can we support Stream Mode
> Running a job in stream mode doesn't guarantee the job will finish,
> while in batch mode, it does.  This is the main reason that prevents
> us from supporting cache in stream mode. The cache cannot be used
> unless the job can finish.
> If I understand correctly, by "run batch jobs in Stream Mode", you
> mean that you have a job with all bounded sources, but you want the
> intermediate data to shuffle in pipelined mode instead of blocking
> mode. If that is the case, the job can run in batch mode with
> "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1].
> And we can support caching in this case.
>
> - Change parallelism of CachedDataStream
> CachedDataStream extends from DataStream, which doesn't have the
> `setParallelism` method like the `SingleOutputStreamOperator`. Thus,
> it should not be a problem with CachedDataStream.
>
> For Gen:
>
> - Relation between FLIP-205 and FLIP-188
> Although it feels like dynamic table and caching are similar in the
> sense that they let user reuse come intermediate result, they target
> different use cases. The dynamic table is targeting the use case where
> users want to share a dynamic updating intermediate result across
> multiple applications. It is some meaningful data that can be consumed
> by different Flink applications and Flink jobs. While caching is
> targeting the use case where users know that all the sources are
> bounded and static, and caching is only used to avoid re-computing the
> intermediate result. And the cached intermediate result is only
> meaningful crossing jobs in the same application.
>
> Dynamic table and caching can be used together. For example, in a
> machine learning scenario, we can have a Stream job that is generating
> some training samples. And we can create a dynamic table for the
> training sample. And we run a Flink application every hour to do some
> data analysis on the training sample generated in the last hour. The
> Flink application consists of multiple batch jobs and the batch jobs
> share some intermediate results, so users can use cache to avoid
> re-computation. The intermediate result is not meaningful outside of
> the application. And the cache will be discarded after the application
> is finished.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
>
>
> On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <luogen...@gmail.com> wrote:
> >
> > Hi Xuannan,
> >
> > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> > storage, which provides a unified changelog & table representation.
> Tables
> > stored there can be used in further ad-hoc queries. To my understanding,
> > it's quite like an implementation of caching in Table API, and the ad-hoc
> > queries are somehow like further steps in an interactive program.
> >
> > As you replied, caching at Table/SQL API is the next step, as a part of
> > interactive programming in Table API, which we all agree is the major
> > scenario. What do you think about the relation between it and FLIP-188?
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> >
> > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <suxuanna...@gmail.com>
> wrote:
> >
> > > Hi David,
> > >
> > > Thanks for sharing your thoughts.
> > >
> > > You are right that most people tend to use high-level API for
> > > interactive data exploration. Actually, there is
> > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > know, it has been accepted but hasn’t been implemented. At the time
> > > when it is drafted, DataStream did not support Batch mode but Table
> > > API does.
> > >
> > > Now that the DataStream API does support batch processing, I think we
> > > can focus on supporting cache at DataStream first. It is still
> > > valuable for DataStream users and most of the work we do in this FLIP
> > > can be reused. So I want to limit the scope of this FLIP.
> > >
> > > After caching is supported at DataStream, we can continue from where
> > > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > >
> > > Best,
> > > Xuannan
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > >
> > >
> > >
> > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <d...@apache.org> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > thanks for drafting this FLIP.
> > > >
> > > > One immediate thought, from what I've seen for interactive data
> > > exploration
> > > > with Spark, most people tend to use the higher level APIs, that
> allow for
> > > > faster prototyping (Table API in Flink's case). Should the Table API
> also
> > > > be covered by this FLIP?
> > > >
> > > > Best,
> > > > D.
> > > >
> > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <suxuanna...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I’d like to start a discussion about adding support to cache the
> > > > > intermediate result at DataStream API for batch processing.
> > > > >
> > > > > As the DataStream API now supports batch execution mode, we see
> users
> > > > > using the DataStream API to run batch jobs. Interactive
> programming is
> > > > > an important use case of Flink batch processing. And the ability to
> > > > > cache intermediate results of a DataStream is crucial to the
> > > > > interactive programming experience.
> > > > >
> > > > > Therefore, we propose to support caching a DataStream in Batch
> > > > > execution. We believe that users can benefit a lot from the change
> and
> > > > > encourage them to use DataStream API for their interactive batch
> > > > > processing work.
> > > > >
> > > > > Please check out the FLIP-205 [1] and feel free to reply to this
> email
> > > > > thread. Looking forward to your feedback!
> > > > >
> > > > > [1]
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > >
>


-- 
best,
Zhipeng

Reply via email to