Hi, 

Thanks Xuannan for the clarification, I also have no other issues~

Best,
Yun



 ------------------Original Mail ------------------
Sender:Xuannan Su <suxuanna...@gmail.com>
Send Date:Wed Jan 19 11:35:13 2022
Recipients:Flink Dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing
Hi devs,



Thank you all for the discussion.

If there are no objections or feedback, I would like to start the vote

thread tomorrow.



Best,

Xuannan



On Tue, Jan 18, 2022 at 8:12 PM Xuannan Su  wrote:

>

> Hi Yun,

>

> Thanks for your questions.

>

> 1. I think the execute_and_collect is an API on the DataStream, which

> adds a collect sink to the DataStream and invokes

> StreamExecutionEnvironment#execute. It is a convenient method to

> execute a job and get an iterator of the result.

>

> 2. As our offline discussion, there are two ways to re-compute the

> missing cache intermediate result.

>

> In the current design, the re-submission of the job happens on the

> client-side. We can throw a non-recoverable exception, annotated by

> `ThrowableType#NonRecoverableError`, to bypass the failover strategy

> when we found that the cache is missing. When the client catches the

> error, it can submit the original job to re-compute the intermediate

> result.

>

> The re-submission process of the job can happen at the scheduler. This

> way, the cache-consuming job has to contains the vertex that creates

> the cache. If the scheduler finds that the cache intermediate result

> exists, it skips the cache creating vertices. If the cache consuming

> vertex finds out the cache intermediate result is missing, the

> scheduler restarts the cache creating vertices.

>

> Handling the missing cache at the scheduler requires a lot more work

> on the scheduler, compared to re-submit the job at the client side.

> Thus, for this FLIP, we will choose the first method. When the

> scheduler is ready, we can make it work with the scheduler. And the

> process should be transparent to the user.

>

> Best,

> Xuannan

>

>

> On Mon, Jan 17, 2022 at 2:07 PM Yun Gao  wrote:

> >

> > Hi Xuannan,

> >

> > Very thanks for the detailed explanation and sorry for the very

> > late response.

> >

> > For cached result partition v.s. managed table, I also agree with

> > the current conclusion that they could be differentiated at the moment:

> > cached result partition could be viewed as an internal, lightweight data

> > cache whose lifecycle is bound to the current application, and managed

> > table could be viewed as an external service whose lifecycle could be

> > across multiple applications.

> >

> > For the other issues, after more thoughts I currently have two remaining

> > issues:

> >

> > 1. Regarding the api, I think it should work if we could execute multiple

> > caches as a whole, but from the FLIP, currently in the example it seems

> > we are calling execute_and_collect() on top of a single CachedDataStream?

> > Also in the give API CachedDataStream does not seem to have a method

> > execute_and_collect() ?

> > 2. For re-submitting the job when the cached result partition is missing, 
> > would

> > this happen in the client side or in the scheduler? If this happens in the 
> > client

> > side, we need to bypass the give failover strategy (like attempting for N 
> > times)

> > when we found the cache result partition is missed?

> >

> > Best,

> > Yun

> >

> >

> >

> > ------------------------------------------------------------------

> > From:Xuannan Su 

> > Send Time:2022 Jan. 17 (Mon.) 13:00

> > To:dev 

> > Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch 
> > Processing

> >

> > Hi David,

> >

> > Thanks for pointing out the FLIP-187. After reading the FLIP, I think it

> > can solve the problem of choosing the proper parallelism, and thus it

> > should be fine to not provide the method to set the parallelism of the

> > cache.

> >

> > And you understand of the outcome of this FLIP is correct.

> >

> > If there are no more feedback and objections, I would like to start a vote

> > thread tomorrow.

> >

> > Best,

> > Xuannan

> >

> > On Fri, Jan 14, 2022 at 5:34 PM David Morávek  wrote:

> >

> > > Hi Xuannan,

> > >

> > > I think this already looks really good. The whole discussions is pretty

> > > long, so I'll just to summarize my current understanding of the outcome:

> > >

> > > - This only aims on the DataStream API for now, but can be used as a

> > > building block for the higher level abstractions (Table API).

> > > - We're pushing caching down to the shuffle service (works with all

> > > implementations), storing the intermediate results. This should also

> > > naturally work with current fail-over mechanisms for batch (backtrack +

> > > recompute missing intermediate results [1]).

> > >

> > >

> > > > For setting the parallelism of the CacheTransformation. With the

> > > > current design, the parallelism of the cache intermediate result is

> > > > determined by the parallelism of the transformation that produces the

> > > > intermediate result to cache. Thus, the parallelism of the caching

> > > > transformation is set by the parallelism of the transformation to be

> > > > cached. I think the overhead should not be critical as the

> > > > cache-producing job suffers from the same overhead anyway. For

> > > > CacheTransformation with large parallelism but the result dataset is

> > > > relatively small, I think we should reduce the parallelism of the

> > > > transformation to cache.

> > >

> > >

> > > Is the whole "choosing the right parallelism for caching" problem solved 
> > > by

> > > the Adaptive Batch Job Scheduler [2]?

> > >

> > > [1]

> > >

> > > https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html

> > > [2]

> > >

> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler

> > >

> > > Best,

> > > D.

> > >

> > > On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su  wrote:

> > >

> > > > Hi Gen,

> > > >

> > > > Thanks for your feedback.

> > > >

> > > > I think you are talking about how we are going to store the caching

> > > > data. The first option is to write the data with a sink to an external

> > > > file system, much like the file store of the Dynamic Table. If I

> > > > understand correctly, it requires a distributed file system, e.g HDSF,

> > > > s3, etc. In my opinion, it is too heavyweight to use a distributed

> > > > file system for caching.

> > > >

> > > > As you said, using the shuffle service for caching is quite natural as

> > > > we need to produce the intermediate result anyway. For Table/SQL API,

> > > > the table operations are translated to transformations, where we can

> > > > reuse the CacheTransformation. It should not be unfriendly for

> > > > Table/SQL API.

> > > >

> > > > For setting the parallelism of the CacheTransformation. With the

> > > > current design, the parallelism of the cache intermediate result is

> > > > determined by the parallelism of the transformation that produces the

> > > > intermediate result to cache. Thus, the parallelism of the caching

> > > > transformation is set by the parallelism of the transformation to be

> > > > cached. I think the overhead should not be critical as the

> > > > cache-producing job suffers from the same overhead anyway. For

> > > > CacheTransformation with large parallelism but the result dataset is

> > > > relatively small, I think we should reduce the parallelism of the

> > > > transformation to cache.

> > > >

> > > > Best,

> > > > Xuannan

> > > >

> > > >

> > > >

> > > > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo  wrote:

> > > > >

> > > > > Hi Xuannan,

> > > > >

> > > > > Thanks for the reply.

> > > > >

> > > > > I do agree that dynamic tables and cached partitions are similar

> > > features

> > > > > aiming different cases. In my opinion, the main difference of the

> > > > > implementations is to cache only the data or the whole result

> > > partition.

> > > > >

> > > > > To cache only the data, we can translate the CacheTransformation to a

> > > > Sink

> > > > > node for writing, and Source node for consuming. Most of the things 
> > > > > are

> > > > > just the same as this FLIP, except for the storage, which is an

> > > external

> > > > > one (or a built-in one if we can use the dynamic table storage),

> > > instead

> > > > of

> > > > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.

> > > This

> > > > > can make caching independent from a specific shuffle service, and make

> > > it

> > > > > possible to share data between different jobs / Per-Job mode jobs.

> > > > >

> > > > > Caching the whole partition is natural in DataStream API, since the

> > > > > partition is a low-level concept, and data storage is already provided

> > > by

> > > > > the default shuffle service. So if we want to choose a solution only 
> > > > > to

> > > > > support cache in DataStream API, caching the whole partition can be a

> > > > good

> > > > > choice. But this may be not as friendly to Table/SQL API as to

> > > > > DataStream, since users are announcing to cache a logical Table 
> > > > > (view),

> > > > > rather than a physical partition. If we want a unified solution for

> > > both

> > > > > APIs, this may need to be considered.

> > > > >

> > > > >

> > > > > And here's another suggestion to this FLIP. Maybe we should support

> > > > > "setParallelism" in CacheTransformation, for both caching and

> > > consuming.

> > > > >

> > > > > In some cases, the input parallelism of the CacheTransformation is

> > > large

> > > > > but the result dataset is relatively small. We may need too many

> > > > resources

> > > > > to consume the result partition if the source parallelism has to be 
> > > > > the

> > > > > same with the producer.

> > > > >

> > > > > On the other hand, serving a large number of partitions may also have

> > > > more

> > > > > overhead. Though maybe it's not a big burban, we can try to reduce the

> > > > > actual cached partition count if necessary, for example by adding a

> > > > > pass-through vertex with the specific parallelism between the producer

> > > > and

> > > > > the cache vertices.

> > > > >

> > > > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang 
> > > >

> > > > > wrote:

> > > > >

> > > > > > Hi Xuannnan,

> > > > > >

> > > > > > Thanks for the reply.

> > > > > >

> > > > > > Regarding whether and how to support cache sideoutput, I agree that

> > > the

> > > > > > second option might be better if there do exist a use case that 
> > > > > > users

> > > > need

> > > > > > to cache only some certain side outputs.

> > > > > >

> > > > > >

> > > > > > Xuannan Su  于2022年1月4日周二 15:50写道:

> > > > > >

> > > > > > > Hi Zhipeng and Gen,

> > > > > > >

> > > > > > > Thanks for joining the discussion.

> > > > > > >

> > > > > > > For Zhipeng:

> > > > > > >

> > > > > > > - Can we support side output

> > > > > > > Caching the side output is indeed a valid use case. However, with

> > > the

> > > > > > > current API, it is not straightforward to cache the side output.

> > > You

> > > > > > > can apply an identity map function to the DataStream returned by

> > > the

> > > > > > > getSideOutput method and then cache the result of the map

> > > > > > > transformation. In my opinion, it is not user-friendly. Therefore,

> > > we

> > > > > > > should think of a way to better support the use case.

> > > > > > > As you say, we can introduce a new class

> > > > > > > `CachedSingleOutputStreamOperator`, and overwrite the

> > > `getSideOutput`

> > > > > > > method to return a `CachedDatastream`. With this approach, the

> > > cache

> > > > > > > method implies that both output and the side output of the

> > > > > > > `SingleOutputStreamOperatior` are cached. The problem with this

> > > > > > > approach is that the user has no control over which side output

> > > > should

> > > > > > > be cached.

> > > > > > > Another option would be to let the `getSideOuput` method return 
> > > > > > > the

> > > > > > > `SingleOutputStreamOperator`. This way, users can decide which 
> > > > > > > side

> > > > > > > output to cache. As the `getSideOutput` method returns a

> > > > > > > `SingleOutputStreamOperator`. Users can set properties of the

> > > > > > > transformation that produce the side output, e.g. parallelism,

> > > buffer

> > > > > > > timeout, etc. If users try to set different values of the same

> > > > > > > property of a transformation, an exception will be thrown. What do

> > > > you

> > > > > > > think?

> > > > > > >

> > > > > > > - Can we support Stream Mode

> > > > > > > Running a job in stream mode doesn't guarantee the job will 
> > > > > > > finish,

> > > > > > > while in batch mode, it does. This is the main reason that

> > > prevents

> > > > > > > us from supporting cache in stream mode. The cache cannot be used

> > > > > > > unless the job can finish.

> > > > > > > If I understand correctly, by "run batch jobs in Stream Mode", you

> > > > > > > mean that you have a job with all bounded sources, but you want 
> > > > > > > the

> > > > > > > intermediate data to shuffle in pipelined mode instead of blocking

> > > > > > > mode. If that is the case, the job can run in batch mode with

> > > > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"

> > > [1].

> > > > > > > And we can support caching in this case.

> > > > > > >

> > > > > > > - Change parallelism of CachedDataStream

> > > > > > > CachedDataStream extends from DataStream, which doesn't have the

> > > > > > > `setParallelism` method like the `SingleOutputStreamOperator`.

> > > Thus,

> > > > > > > it should not be a problem with CachedDataStream.

> > > > > > >

> > > > > > > For Gen:

> > > > > > >

> > > > > > > - Relation between FLIP-205 and FLIP-188

> > > > > > > Although it feels like dynamic table and caching are similar in 
> > > > > > > the

> > > > > > > sense that they let user reuse come intermediate result, they

> > > target

> > > > > > > different use cases. The dynamic table is targeting the use case

> > > > where

> > > > > > > users want to share a dynamic updating intermediate result across

> > > > > > > multiple applications. It is some meaningful data that can be

> > > > consumed

> > > > > > > by different Flink applications and Flink jobs. While caching is

> > > > > > > targeting the use case where users know that all the sources are

> > > > > > > bounded and static, and caching is only used to avoid re-computing

> > > > the

> > > > > > > intermediate result. And the cached intermediate result is only

> > > > > > > meaningful crossing jobs in the same application.

> > > > > > >

> > > > > > > Dynamic table and caching can be used together. For example, in a

> > > > > > > machine learning scenario, we can have a Stream job that is

> > > > generating

> > > > > > > some training samples. And we can create a dynamic table for the

> > > > > > > training sample. And we run a Flink application every hour to do

> > > some

> > > > > > > data analysis on the training sample generated in the last hour.

> > > The

> > > > > > > Flink application consists of multiple batch jobs and the batch

> > > jobs

> > > > > > > share some intermediate results, so users can use cache to avoid

> > > > > > > re-computation. The intermediate result is not meaningful outside

> > > of

> > > > > > > the application. And the cache will be discarded after the

> > > > application

> > > > > > > is finished.

> > > > > > >

> > > > > > > [1]

> > > > > > >

> > > > > >

> > > >

> > > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode

> > > > > > >

> > > > > > >

> > > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo 

> > > wrote:

> > > > > > > >

> > > > > > > > Hi Xuannan,

> > > > > > > >

> > > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in

> > > dynamic

> > > > > > table

> > > > > > > > storage, which provides a unified changelog & table

> > > representation.

> > > > > > > Tables

> > > > > > > > stored there can be used in further ad-hoc queries. To my

> > > > > > understanding,

> > > > > > > > it's quite like an implementation of caching in Table API, and

> > > the

> > > > > > ad-hoc

> > > > > > > > queries are somehow like further steps in an interactive 
> > > > > > > > program.

> > > > > > > >

> > > > > > > > As you replied, caching at Table/SQL API is the next step, as a

> > > > part of

> > > > > > > > interactive programming in Table API, which we all agree is the

> > > > major

> > > > > > > > scenario. What do you think about the relation between it and

> > > > FLIP-188?

> > > > > > > >

> > > > > > > > [1]

> > > > > > > >

> > > > > > >

> > > > > >

> > > >

> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage

> > > > > > > >

> > > > > > > >

> > > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <

> > > suxuanna...@gmail.com>

> > > > > > > wrote:

> > > > > > > >

> > > > > > > > > Hi David,

> > > > > > > > >

> > > > > > > > > Thanks for sharing your thoughts.

> > > > > > > > >

> > > > > > > > > You are right that most people tend to use high-level API for

> > > > > > > > > interactive data exploration. Actually, there is

> > > > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As 
> > > > > > > > > far

> > > > as I

> > > > > > > > > know, it has been accepted but hasn’t been implemented. At the

> > > > time

> > > > > > > > > when it is drafted, DataStream did not support Batch mode but

> > > > Table

> > > > > > > > > API does.

> > > > > > > > >

> > > > > > > > > Now that the DataStream API does support batch processing, I

> > > > think we

> > > > > > > > > can focus on supporting cache at DataStream first. It is still

> > > > > > > > > valuable for DataStream users and most of the work we do in

> > > this

> > > > FLIP

> > > > > > > > > can be reused. So I want to limit the scope of this FLIP.

> > > > > > > > >

> > > > > > > > > After caching is supported at DataStream, we can continue from

> > > > where

> > > > > > > > > FLIP-36 left off to support caching at Table/SQL API. We might

> > > > have

> > > > > > to

> > > > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?

> > > > > > > > >

> > > > > > > > > Best,

> > > > > > > > > Xuannan

> > > > > > > > >

> > > > > > > > > [1]

> > > > > > > > >

> > > > > > >

> > > > > >

> > > >

> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

> > > > > > > > >

> > > > > > > > >

> > > > > > > > >

> > > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek 
> > > >

> > > > > > wrote:

> > > > > > > > > >

> > > > > > > > > > Hi Xuannan,

> > > > > > > > > >

> > > > > > > > > > thanks for drafting this FLIP.

> > > > > > > > > >

> > > > > > > > > > One immediate thought, from what I've seen for interactive

> > > data

> > > > > > > > > exploration

> > > > > > > > > > with Spark, most people tend to use the higher level APIs,

> > > that

> > > > > > > allow for

> > > > > > > > > > faster prototyping (Table API in Flink's case). Should the

> > > > Table

> > > > > > API

> > > > > > > also

> > > > > > > > > > be covered by this FLIP?

> > > > > > > > > >

> > > > > > > > > > Best,

> > > > > > > > > > D.

> > > > > > > > > >

> > > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <

> > > > suxuanna...@gmail.com

> > > > > > >

> > > > > > > > > wrote:

> > > > > > > > > >

> > > > > > > > > > > Hi devs,

> > > > > > > > > > >

> > > > > > > > > > > I’d like to start a discussion about adding support to

> > > cache

> > > > the

> > > > > > > > > > > intermediate result at DataStream API for batch 
> > > > > > > > > > > processing.

> > > > > > > > > > >

> > > > > > > > > > > As the DataStream API now supports batch execution mode, 
> > > > > > > > > > > we

> > > > see

> > > > > > > users

> > > > > > > > > > > using the DataStream API to run batch jobs. Interactive

> > > > > > > programming is

> > > > > > > > > > > an important use case of Flink batch processing. And the

> > > > ability

> > > > > > to

> > > > > > > > > > > cache intermediate results of a DataStream is crucial to

> > > the

> > > > > > > > > > > interactive programming experience.

> > > > > > > > > > >

> > > > > > > > > > > Therefore, we propose to support caching a DataStream in

> > > > Batch

> > > > > > > > > > > execution. We believe that users can benefit a lot from 
> > > > > > > > > > > the

> > > > > > change

> > > > > > > and

> > > > > > > > > > > encourage them to use DataStream API for their interactive

> > > > batch

> > > > > > > > > > > processing work.

> > > > > > > > > > >

> > > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply 
> > > > > > > > > > > to

> > > > this

> > > > > > > email

> > > > > > > > > > > thread. Looking forward to your feedback!

> > > > > > > > > > >

> > > > > > > > > > > [1]

> > > > > > > > > > >

> > > > > > > > >

> > > > > > >

> > > > > >

> > > >

> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing

> > > > > > > > > > >

> > > > > > > > > > > Best,

> > > > > > > > > > > Xuannan

> > > > > > > > > > >

> > > > > > > > >

> > > > > > >

> > > > > >

> > > > > >

> > > > > > --

> > > > > > best,

> > > > > > Zhipeng

> > > > > >

> > > >

> > >

> >

Reply via email to