+1 Thanks Xuannan, no more objections from my side

On Mon, Jan 17, 2022 at 7:14 AM Yun Gao <yungao...@aliyun.com.invalid>
wrote:

> Hi Xuannan,
>
> Very thanks for the detailed explanation and sorry for the very
> late response.
>
> For cached result partition v.s. managed table, I also agree with
> the current conclusion that they could be differentiated at the moment:
> cached result partition could be viewed as an internal, lightweight data
> cache whose lifecycle is bound to the current application, and managed
> table could be viewed as an external service whose lifecycle could be
> across multiple applications.
>
> For the other issues, after more thoughts I currently have two remaining
> issues:
>
> 1. Regarding the api, I think it should work if we could execute multiple
> caches as a whole, but from the FLIP, currently in the example it seems
> we are calling execute_and_collect() on top of a single CachedDataStream?
> Also in the give API CachedDataStream does not seem to have a method
> execute_and_collect() ?
> 2. For re-submitting the job when the cached result partition is missing,
> would
> this happen in the client side or in the scheduler? If this happens in the
> client
> side, we need to bypass the give failover strategy (like attempting for N
> times)
> when we found the cache result partition is missed?
>
> Best,
> Yun
>
>
>
> ------------------------------------------------------------------
> From:Xuannan Su <suxuanna...@gmail.com>
> Send Time:2022 Jan. 17 (Mon.) 13:00
> To:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch
> Processing
>
> Hi David,
>
> Thanks for pointing out the FLIP-187. After reading the FLIP, I think it
> can solve the problem of choosing the proper parallelism, and thus it
> should be fine to not provide the method to set the parallelism of the
> cache.
>
> And you understand of the outcome of this FLIP is correct.
>
> If there are no more feedback and objections, I would like to start a vote
> thread tomorrow.
>
> Best,
> Xuannan
>
> On Fri, Jan 14, 2022 at 5:34 PM David Morávek <d...@apache.org> wrote:
>
> > Hi Xuannan,
> >
> > I think this already looks really good. The whole discussions is pretty
> > long, so I'll just to summarize my current understanding of the outcome:
> >
> > - This only aims on the DataStream API for now, but can be used as a
> > building block for the higher level abstractions (Table API).
> > - We're pushing caching down to the shuffle service (works with all
> > implementations), storing the intermediate results. This should also
> > naturally work with current fail-over mechanisms for batch (backtrack +
> > recompute missing intermediate results [1]).
> >
> >
> > > For setting the parallelism of the CacheTransformation. With the
> > > current design, the parallelism of the cache intermediate result is
> > > determined by the parallelism of the transformation that produces the
> > > intermediate result to cache. Thus, the parallelism of the caching
> > > transformation is set by the parallelism of the transformation to be
> > > cached. I think the overhead should not be critical as the
> > > cache-producing job suffers from the same overhead anyway. For
> > > CacheTransformation with large parallelism but the result dataset is
> > > relatively small, I think we should reduce the parallelism of the
> > > transformation to cache.
> >
> >
> > Is the whole "choosing the right parallelism for caching" problem solved
> by
> > the Adaptive Batch Job Scheduler [2]?
> >
> > [1]
> >
> >
> https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> >
> > Best,
> > D.
> >
> > On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <suxuanna...@gmail.com>
> wrote:
> >
> > > Hi Gen,
> > >
> > > Thanks for your feedback.
> > >
> > > I think you are talking about how we are going to store the caching
> > > data. The first option is to write the data with a sink to an external
> > > file system, much like the file store of the Dynamic Table. If I
> > > understand correctly, it requires a distributed file system, e.g HDSF,
> > > s3, etc. In my opinion, it is too heavyweight to use a distributed
> > > file system for caching.
> > >
> > > As you said, using the shuffle service for caching is quite natural as
> > > we need to produce the intermediate result anyway. For Table/SQL API,
> > > the table operations are translated to transformations, where we can
> > > reuse the CacheTransformation. It should not be unfriendly for
> > > Table/SQL API.
> > >
> > > For setting the parallelism of the CacheTransformation. With the
> > > current design, the parallelism of the cache intermediate result is
> > > determined by the parallelism of the transformation that produces the
> > > intermediate result to cache. Thus, the parallelism of the caching
> > > transformation is set by the parallelism of the transformation to be
> > > cached. I think the overhead should not be critical as the
> > > cache-producing job suffers from the same overhead anyway. For
> > > CacheTransformation with large parallelism but the result dataset is
> > > relatively small, I think we should reduce the parallelism of the
> > > transformation to cache.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > >
> > > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <luogen...@gmail.com> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > I do agree that dynamic tables and cached partitions are similar
> > features
> > > > aiming different cases.  In my opinion, the main difference of the
> > > > implementations is to cache only the data or the whole result
> > partition.
> > > >
> > > > To cache only the data, we can translate the CacheTransformation to a
> > > Sink
> > > > node for writing, and Source node for consuming. Most of the things
> are
> > > > just the same as this FLIP, except for the storage, which is an
> > external
> > > > one (or a built-in one if we can use the dynamic table storage),
> > instead
> > > of
> > > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.
> > This
> > > > can make caching independent from a specific shuffle service, and
> make
> > it
> > > > possible to share data between different jobs / Per-Job mode jobs.
> > > >
> > > > Caching the whole partition is natural in DataStream API, since the
> > > > partition is a low-level concept, and data storage is already
> provided
> > by
> > > > the default shuffle service. So if we want to choose a solution only
> to
> > > > support cache in DataStream API, caching the whole partition can be a
> > > good
> > > > choice. But this may be not as friendly to Table/SQL API as to
> > > > DataStream, since users are announcing to cache a logical Table
> (view),
> > > > rather than a physical partition. If we want a unified solution for
> > both
> > > > APIs, this may need to be considered.
> > > >
> > > >
> > > > And here's another suggestion to this FLIP. Maybe we should support
> > > > "setParallelism" in CacheTransformation, for both caching and
> > consuming.
> > > >
> > > > In some cases, the input parallelism of the CacheTransformation is
> > large
> > > > but the result dataset is relatively small. We may need too many
> > > resources
> > > > to consume the result partition if the source parallelism has to be
> the
> > > > same with the producer.
> > > >
> > > > On the other hand, serving a large number of partitions may also have
> > > more
> > > > overhead. Though maybe it's not a big burban, we can try to reduce
> the
> > > > actual cached partition count if necessary, for example by adding a
> > > > pass-through vertex with the specific parallelism between the
> producer
> > > and
> > > > the cache vertices.
> > > >
> > > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <
> zhangzhipe...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Xuannnan,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > Regarding whether and how to support cache sideoutput, I agree that
> > the
> > > > > second option might be better if there do exist a use case that
> users
> > > need
> > > > > to cache only some certain side outputs.
> > > > >
> > > > >
> > > > > Xuannan Su <suxuanna...@gmail.com> 于2022年1月4日周二 15:50写道:
> > > > >
> > > > > > Hi Zhipeng and Gen,
> > > > > >
> > > > > > Thanks for joining the discussion.
> > > > > >
> > > > > > For Zhipeng:
> > > > > >
> > > > > > - Can we support side output
> > > > > > Caching the side output is indeed a valid use case. However, with
> > the
> > > > > > current API, it is not straightforward to cache the side output.
> > You
> > > > > > can apply an identity map function to the DataStream returned by
> > the
> > > > > > getSideOutput method and then cache the result of the map
> > > > > > transformation. In my opinion, it is not user-friendly.
> Therefore,
> > we
> > > > > > should think of a way to better support the use case.
> > > > > > As you say, we can introduce a new class
> > > > > > `CachedSingleOutputStreamOperator`, and overwrite the
> > `getSideOutput`
> > > > > > method to return a `CachedDatastream`. With this approach, the
> > cache
> > > > > > method implies that both output and the side output of the
> > > > > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > > > > approach is that the user has no control over which side output
> > > should
> > > > > > be cached.
> > > > > > Another option would be to let the `getSideOuput` method return
> the
> > > > > > `SingleOutputStreamOperator`. This way, users can decide which
> side
> > > > > > output to cache. As the `getSideOutput` method returns a
> > > > > > `SingleOutputStreamOperator`. Users can set properties of the
> > > > > > transformation that produce the side output, e.g. parallelism,
> > buffer
> > > > > > timeout, etc. If users try to set different values of the same
> > > > > > property of a transformation, an exception will be thrown. What
> do
> > > you
> > > > > > think?
> > > > > >
> > > > > > - Can we support Stream Mode
> > > > > > Running a job in stream mode doesn't guarantee the job will
> finish,
> > > > > > while in batch mode, it does.  This is the main reason that
> > prevents
> > > > > > us from supporting cache in stream mode. The cache cannot be used
> > > > > > unless the job can finish.
> > > > > > If I understand correctly, by "run batch jobs in Stream Mode",
> you
> > > > > > mean that you have a job with all bounded sources, but you want
> the
> > > > > > intermediate data to shuffle in pipelined mode instead of
> blocking
> > > > > > mode. If that is the case, the job can run in batch mode with
> > > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"
> > [1].
> > > > > > And we can support caching in this case.
> > > > > >
> > > > > > - Change parallelism of CachedDataStream
> > > > > > CachedDataStream extends from DataStream, which doesn't have the
> > > > > > `setParallelism` method like the `SingleOutputStreamOperator`.
> > Thus,
> > > > > > it should not be a problem with CachedDataStream.
> > > > > >
> > > > > > For Gen:
> > > > > >
> > > > > > - Relation between FLIP-205 and FLIP-188
> > > > > > Although it feels like dynamic table and caching are similar in
> the
> > > > > > sense that they let user reuse come intermediate result, they
> > target
> > > > > > different use cases. The dynamic table is targeting the use case
> > > where
> > > > > > users want to share a dynamic updating intermediate result across
> > > > > > multiple applications. It is some meaningful data that can be
> > > consumed
> > > > > > by different Flink applications and Flink jobs. While caching is
> > > > > > targeting the use case where users know that all the sources are
> > > > > > bounded and static, and caching is only used to avoid
> re-computing
> > > the
> > > > > > intermediate result. And the cached intermediate result is only
> > > > > > meaningful crossing jobs in the same application.
> > > > > >
> > > > > > Dynamic table and caching can be used together. For example, in a
> > > > > > machine learning scenario, we can have a Stream job that is
> > > generating
> > > > > > some training samples. And we can create a dynamic table for the
> > > > > > training sample. And we run a Flink application every hour to do
> > some
> > > > > > data analysis on the training sample generated in the last hour.
> > The
> > > > > > Flink application consists of multiple batch jobs and the batch
> > jobs
> > > > > > share some intermediate results, so users can use cache to avoid
> > > > > > re-computation. The intermediate result is not meaningful outside
> > of
> > > > > > the application. And the cache will be discarded after the
> > > application
> > > > > > is finished.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > > > > >
> > > > > >
> > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <luogen...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > Hi Xuannan,
> > > > > > >
> > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in
> > dynamic
> > > > > table
> > > > > > > storage, which provides a unified changelog & table
> > representation.
> > > > > > Tables
> > > > > > > stored there can be used in further ad-hoc queries. To my
> > > > > understanding,
> > > > > > > it's quite like an implementation of caching in Table API, and
> > the
> > > > > ad-hoc
> > > > > > > queries are somehow like further steps in an interactive
> program.
> > > > > > >
> > > > > > > As you replied, caching at Table/SQL API is the next step, as a
> > > part of
> > > > > > > interactive programming in Table API, which we all agree is the
> > > major
> > > > > > > scenario. What do you think about the relation between it and
> > > FLIP-188?
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <
> > suxuanna...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi David,
> > > > > > > >
> > > > > > > > Thanks for sharing your thoughts.
> > > > > > > >
> > > > > > > > You are right that most people tend to use high-level API for
> > > > > > > > interactive data exploration. Actually, there is
> > > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As
> far
> > > as I
> > > > > > > > know, it has been accepted but hasn’t been implemented. At
> the
> > > time
> > > > > > > > when it is drafted, DataStream did not support Batch mode but
> > > Table
> > > > > > > > API does.
> > > > > > > >
> > > > > > > > Now that the DataStream API does support batch processing, I
> > > think we
> > > > > > > > can focus on supporting cache at DataStream first. It is
> still
> > > > > > > > valuable for DataStream users and most of the work we do in
> > this
> > > FLIP
> > > > > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > > > > >
> > > > > > > > After caching is supported at DataStream, we can continue
> from
> > > where
> > > > > > > > FLIP-36 left off to support caching at Table/SQL API. We
> might
> > > have
> > > > > to
> > > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <
> d...@apache.org
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Xuannan,
> > > > > > > > >
> > > > > > > > > thanks for drafting this FLIP.
> > > > > > > > >
> > > > > > > > > One immediate thought, from what I've seen for interactive
> > data
> > > > > > > > exploration
> > > > > > > > > with Spark, most people tend to use the higher level APIs,
> > that
> > > > > > allow for
> > > > > > > > > faster prototyping (Table API in Flink's case). Should the
> > > Table
> > > > > API
> > > > > > also
> > > > > > > > > be covered by this FLIP?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > D.
> > > > > > > > >
> > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <
> > > suxuanna...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi devs,
> > > > > > > > > >
> > > > > > > > > > I’d like to start a discussion about adding support to
> > cache
> > > the
> > > > > > > > > > intermediate result at DataStream API for batch
> processing.
> > > > > > > > > >
> > > > > > > > > > As the DataStream API now supports batch execution mode,
> we
> > > see
> > > > > > users
> > > > > > > > > > using the DataStream API to run batch jobs. Interactive
> > > > > > programming is
> > > > > > > > > > an important use case of Flink batch processing. And the
> > > ability
> > > > > to
> > > > > > > > > > cache intermediate results of a DataStream is crucial to
> > the
> > > > > > > > > > interactive programming experience.
> > > > > > > > > >
> > > > > > > > > > Therefore, we propose to support caching a DataStream in
> > > Batch
> > > > > > > > > > execution. We believe that users can benefit a lot from
> the
> > > > > change
> > > > > > and
> > > > > > > > > > encourage them to use DataStream API for their
> interactive
> > > batch
> > > > > > > > > > processing work.
> > > > > > > > > >
> > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply
> to
> > > this
> > > > > > email
> > > > > > > > > > thread. Looking forward to your feedback!
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > best,
> > > > > Zhipeng
> > > > >
> > >
> >
>
>

Reply via email to