Hi Yun,

Thanks for your questions.

1. I think the execute_and_collect is an API on the DataStream, which
adds a collect sink to the DataStream and invokes
StreamExecutionEnvironment#execute. It is a convenient method to
execute a job and get an iterator of the result.

2. As our offline discussion, there are two ways to re-compute the
missing cache intermediate result.

In the current design, the re-submission of the job happens on the
client-side. We can throw a non-recoverable exception, annotated by
`ThrowableType#NonRecoverableError`, to bypass the failover strategy
when we found that the cache is missing. When the client catches the
error, it can submit the original job to re-compute the intermediate
result.

The re-submission process of the job can happen at the scheduler. This
way, the cache-consuming job has to contains the vertex that creates
the cache. If the scheduler finds that the cache intermediate result
exists, it skips the cache creating vertices. If the cache consuming
vertex finds out the cache intermediate result is missing, the
scheduler restarts the cache creating vertices.

Handling the missing cache at the scheduler requires a lot more work
on the scheduler, compared to re-submit the job at the client side.
Thus, for this FLIP, we will choose the first method. When the
scheduler is ready, we can make it work with the scheduler. And the
process should be transparent to the user.

Best,
Xuannan


On Mon, Jan 17, 2022 at 2:07 PM Yun Gao <yungao...@aliyun.com.invalid> wrote:
>
> Hi Xuannan,
>
> Very thanks for the detailed explanation and sorry for the very
> late response.
>
> For cached result partition v.s. managed table, I also agree with
> the current conclusion that they could be differentiated at the moment:
> cached result partition could be viewed as an internal, lightweight data
> cache whose lifecycle is bound to the current application, and managed
> table could be viewed as an external service whose lifecycle could be
> across multiple applications.
>
> For the other issues, after more thoughts I currently have two remaining
> issues:
>
> 1. Regarding the api, I think it should work if we could execute multiple
> caches as a whole, but from the FLIP, currently in the example it seems
> we are calling execute_and_collect() on top of a single CachedDataStream?
> Also in the give API CachedDataStream does not seem to have a method
> execute_and_collect() ?
> 2. For re-submitting the job when the cached result partition is missing, 
> would
> this happen in the client side or in the scheduler? If this happens in the 
> client
> side, we need to bypass the give failover strategy (like attempting for N 
> times)
> when we found the cache result partition is missed?
>
> Best,
> Yun
>
>
>
> ------------------------------------------------------------------
> From:Xuannan Su <suxuanna...@gmail.com>
> Send Time:2022 Jan. 17 (Mon.) 13:00
> To:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch 
> Processing
>
> Hi David,
>
> Thanks for pointing out the FLIP-187. After reading the FLIP, I think it
> can solve the problem of choosing the proper parallelism, and thus it
> should be fine to not provide the method to set the parallelism of the
> cache.
>
> And you understand of the outcome of this FLIP is correct.
>
> If there are no more feedback and objections, I would like to start a vote
> thread tomorrow.
>
> Best,
> Xuannan
>
> On Fri, Jan 14, 2022 at 5:34 PM David Morávek <d...@apache.org> wrote:
>
> > Hi Xuannan,
> >
> > I think this already looks really good. The whole discussions is pretty
> > long, so I'll just to summarize my current understanding of the outcome:
> >
> > - This only aims on the DataStream API for now, but can be used as a
> > building block for the higher level abstractions (Table API).
> > - We're pushing caching down to the shuffle service (works with all
> > implementations), storing the intermediate results. This should also
> > naturally work with current fail-over mechanisms for batch (backtrack +
> > recompute missing intermediate results [1]).
> >
> >
> > > For setting the parallelism of the CacheTransformation. With the
> > > current design, the parallelism of the cache intermediate result is
> > > determined by the parallelism of the transformation that produces the
> > > intermediate result to cache. Thus, the parallelism of the caching
> > > transformation is set by the parallelism of the transformation to be
> > > cached. I think the overhead should not be critical as the
> > > cache-producing job suffers from the same overhead anyway. For
> > > CacheTransformation with large parallelism but the result dataset is
> > > relatively small, I think we should reduce the parallelism of the
> > > transformation to cache.
> >
> >
> > Is the whole "choosing the right parallelism for caching" problem solved by
> > the Adaptive Batch Job Scheduler [2]?
> >
> > [1]
> >
> > https://flink.apache.org/news/2021/01/11/batch-fine-grained-fault-tolerance.html
> > [2]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> >
> > Best,
> > D.
> >
> > On Tue, Jan 11, 2022 at 4:09 AM Xuannan Su <suxuanna...@gmail.com> wrote:
> >
> > > Hi Gen,
> > >
> > > Thanks for your feedback.
> > >
> > > I think you are talking about how we are going to store the caching
> > > data. The first option is to write the data with a sink to an external
> > > file system, much like the file store of the Dynamic Table. If I
> > > understand correctly, it requires a distributed file system, e.g HDSF,
> > > s3, etc. In my opinion, it is too heavyweight to use a distributed
> > > file system for caching.
> > >
> > > As you said, using the shuffle service for caching is quite natural as
> > > we need to produce the intermediate result anyway. For Table/SQL API,
> > > the table operations are translated to transformations, where we can
> > > reuse the CacheTransformation. It should not be unfriendly for
> > > Table/SQL API.
> > >
> > > For setting the parallelism of the CacheTransformation. With the
> > > current design, the parallelism of the cache intermediate result is
> > > determined by the parallelism of the transformation that produces the
> > > intermediate result to cache. Thus, the parallelism of the caching
> > > transformation is set by the parallelism of the transformation to be
> > > cached. I think the overhead should not be critical as the
> > > cache-producing job suffers from the same overhead anyway. For
> > > CacheTransformation with large parallelism but the result dataset is
> > > relatively small, I think we should reduce the parallelism of the
> > > transformation to cache.
> > >
> > > Best,
> > > Xuannan
> > >
> > >
> > >
> > > On Thu, Jan 6, 2022 at 4:21 PM Gen Luo <luogen...@gmail.com> wrote:
> > > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > I do agree that dynamic tables and cached partitions are similar
> > features
> > > > aiming different cases.  In my opinion, the main difference of the
> > > > implementations is to cache only the data or the whole result
> > partition.
> > > >
> > > > To cache only the data, we can translate the CacheTransformation to a
> > > Sink
> > > > node for writing, and Source node for consuming. Most of the things are
> > > > just the same as this FLIP, except for the storage, which is an
> > external
> > > > one (or a built-in one if we can use the dynamic table storage),
> > instead
> > > of
> > > > the BLOCKING_PERSISTED type ResultPartition in the shuffle service.
> > This
> > > > can make caching independent from a specific shuffle service, and make
> > it
> > > > possible to share data between different jobs / Per-Job mode jobs.
> > > >
> > > > Caching the whole partition is natural in DataStream API, since the
> > > > partition is a low-level concept, and data storage is already provided
> > by
> > > > the default shuffle service. So if we want to choose a solution only to
> > > > support cache in DataStream API, caching the whole partition can be a
> > > good
> > > > choice. But this may be not as friendly to Table/SQL API as to
> > > > DataStream, since users are announcing to cache a logical Table (view),
> > > > rather than a physical partition. If we want a unified solution for
> > both
> > > > APIs, this may need to be considered.
> > > >
> > > >
> > > > And here's another suggestion to this FLIP. Maybe we should support
> > > > "setParallelism" in CacheTransformation, for both caching and
> > consuming.
> > > >
> > > > In some cases, the input parallelism of the CacheTransformation is
> > large
> > > > but the result dataset is relatively small. We may need too many
> > > resources
> > > > to consume the result partition if the source parallelism has to be the
> > > > same with the producer.
> > > >
> > > > On the other hand, serving a large number of partitions may also have
> > > more
> > > > overhead. Though maybe it's not a big burban, we can try to reduce the
> > > > actual cached partition count if necessary, for example by adding a
> > > > pass-through vertex with the specific parallelism between the producer
> > > and
> > > > the cache vertices.
> > > >
> > > > On Wed, Jan 5, 2022 at 11:54 PM Zhipeng Zhang <zhangzhipe...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Xuannnan,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > Regarding whether and how to support cache sideoutput, I agree that
> > the
> > > > > second option might be better if there do exist a use case that users
> > > need
> > > > > to cache only some certain side outputs.
> > > > >
> > > > >
> > > > > Xuannan Su <suxuanna...@gmail.com> 于2022年1月4日周二 15:50写道:
> > > > >
> > > > > > Hi Zhipeng and Gen,
> > > > > >
> > > > > > Thanks for joining the discussion.
> > > > > >
> > > > > > For Zhipeng:
> > > > > >
> > > > > > - Can we support side output
> > > > > > Caching the side output is indeed a valid use case. However, with
> > the
> > > > > > current API, it is not straightforward to cache the side output.
> > You
> > > > > > can apply an identity map function to the DataStream returned by
> > the
> > > > > > getSideOutput method and then cache the result of the map
> > > > > > transformation. In my opinion, it is not user-friendly. Therefore,
> > we
> > > > > > should think of a way to better support the use case.
> > > > > > As you say, we can introduce a new class
> > > > > > `CachedSingleOutputStreamOperator`, and overwrite the
> > `getSideOutput`
> > > > > > method to return a `CachedDatastream`. With this approach, the
> > cache
> > > > > > method implies that both output and the side output of the
> > > > > > `SingleOutputStreamOperatior` are cached. The problem with this
> > > > > > approach is that the user has no control over which side output
> > > should
> > > > > > be cached.
> > > > > > Another option would be to let the `getSideOuput` method return the
> > > > > > `SingleOutputStreamOperator`. This way, users can decide which side
> > > > > > output to cache. As the `getSideOutput` method returns a
> > > > > > `SingleOutputStreamOperator`. Users can set properties of the
> > > > > > transformation that produce the side output, e.g. parallelism,
> > buffer
> > > > > > timeout, etc. If users try to set different values of the same
> > > > > > property of a transformation, an exception will be thrown. What do
> > > you
> > > > > > think?
> > > > > >
> > > > > > - Can we support Stream Mode
> > > > > > Running a job in stream mode doesn't guarantee the job will finish,
> > > > > > while in batch mode, it does.  This is the main reason that
> > prevents
> > > > > > us from supporting cache in stream mode. The cache cannot be used
> > > > > > unless the job can finish.
> > > > > > If I understand correctly, by "run batch jobs in Stream Mode", you
> > > > > > mean that you have a job with all bounded sources, but you want the
> > > > > > intermediate data to shuffle in pipelined mode instead of blocking
> > > > > > mode. If that is the case, the job can run in batch mode with
> > > > > > "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED"
> > [1].
> > > > > > And we can support caching in this case.
> > > > > >
> > > > > > - Change parallelism of CachedDataStream
> > > > > > CachedDataStream extends from DataStream, which doesn't have the
> > > > > > `setParallelism` method like the `SingleOutputStreamOperator`.
> > Thus,
> > > > > > it should not be a problem with CachedDataStream.
> > > > > >
> > > > > > For Gen:
> > > > > >
> > > > > > - Relation between FLIP-205 and FLIP-188
> > > > > > Although it feels like dynamic table and caching are similar in the
> > > > > > sense that they let user reuse come intermediate result, they
> > target
> > > > > > different use cases. The dynamic table is targeting the use case
> > > where
> > > > > > users want to share a dynamic updating intermediate result across
> > > > > > multiple applications. It is some meaningful data that can be
> > > consumed
> > > > > > by different Flink applications and Flink jobs. While caching is
> > > > > > targeting the use case where users know that all the sources are
> > > > > > bounded and static, and caching is only used to avoid re-computing
> > > the
> > > > > > intermediate result. And the cached intermediate result is only
> > > > > > meaningful crossing jobs in the same application.
> > > > > >
> > > > > > Dynamic table and caching can be used together. For example, in a
> > > > > > machine learning scenario, we can have a Stream job that is
> > > generating
> > > > > > some training samples. And we can create a dynamic table for the
> > > > > > training sample. And we run a Flink application every hour to do
> > some
> > > > > > data analysis on the training sample generated in the last hour.
> > The
> > > > > > Flink application consists of multiple batch jobs and the batch
> > jobs
> > > > > > share some intermediate results, so users can use cache to avoid
> > > > > > re-computation. The intermediate result is not meaningful outside
> > of
> > > > > > the application. And the cache will be discarded after the
> > > application
> > > > > > is finished.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
> > > > > >
> > > > > >
> > > > > > On Thu, Dec 30, 2021 at 7:00 PM Gen Luo <luogen...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > Hi Xuannan,
> > > > > > >
> > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in
> > dynamic
> > > > > table
> > > > > > > storage, which provides a unified changelog & table
> > representation.
> > > > > > Tables
> > > > > > > stored there can be used in further ad-hoc queries. To my
> > > > > understanding,
> > > > > > > it's quite like an implementation of caching in Table API, and
> > the
> > > > > ad-hoc
> > > > > > > queries are somehow like further steps in an interactive program.
> > > > > > >
> > > > > > > As you replied, caching at Table/SQL API is the next step, as a
> > > part of
> > > > > > > interactive programming in Table API, which we all agree is the
> > > major
> > > > > > > scenario. What do you think about the relation between it and
> > > FLIP-188?
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <
> > suxuanna...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi David,
> > > > > > > >
> > > > > > > > Thanks for sharing your thoughts.
> > > > > > > >
> > > > > > > > You are right that most people tend to use high-level API for
> > > > > > > > interactive data exploration. Actually, there is
> > > > > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far
> > > as I
> > > > > > > > know, it has been accepted but hasn’t been implemented. At the
> > > time
> > > > > > > > when it is drafted, DataStream did not support Batch mode but
> > > Table
> > > > > > > > API does.
> > > > > > > >
> > > > > > > > Now that the DataStream API does support batch processing, I
> > > think we
> > > > > > > > can focus on supporting cache at DataStream first. It is still
> > > > > > > > valuable for DataStream users and most of the work we do in
> > this
> > > FLIP
> > > > > > > > can be reused. So I want to limit the scope of this FLIP.
> > > > > > > >
> > > > > > > > After caching is supported at DataStream, we can continue from
> > > where
> > > > > > > > FLIP-36 left off to support caching at Table/SQL API. We might
> > > have
> > > > > to
> > > > > > > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <d...@apache.org
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi Xuannan,
> > > > > > > > >
> > > > > > > > > thanks for drafting this FLIP.
> > > > > > > > >
> > > > > > > > > One immediate thought, from what I've seen for interactive
> > data
> > > > > > > > exploration
> > > > > > > > > with Spark, most people tend to use the higher level APIs,
> > that
> > > > > > allow for
> > > > > > > > > faster prototyping (Table API in Flink's case). Should the
> > > Table
> > > > > API
> > > > > > also
> > > > > > > > > be covered by this FLIP?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > D.
> > > > > > > > >
> > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <
> > > suxuanna...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi devs,
> > > > > > > > > >
> > > > > > > > > > I’d like to start a discussion about adding support to
> > cache
> > > the
> > > > > > > > > > intermediate result at DataStream API for batch processing.
> > > > > > > > > >
> > > > > > > > > > As the DataStream API now supports batch execution mode, we
> > > see
> > > > > > users
> > > > > > > > > > using the DataStream API to run batch jobs. Interactive
> > > > > > programming is
> > > > > > > > > > an important use case of Flink batch processing. And the
> > > ability
> > > > > to
> > > > > > > > > > cache intermediate results of a DataStream is crucial to
> > the
> > > > > > > > > > interactive programming experience.
> > > > > > > > > >
> > > > > > > > > > Therefore, we propose to support caching a DataStream in
> > > Batch
> > > > > > > > > > execution. We believe that users can benefit a lot from the
> > > > > change
> > > > > > and
> > > > > > > > > > encourage them to use DataStream API for their interactive
> > > batch
> > > > > > > > > > processing work.
> > > > > > > > > >
> > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to
> > > this
> > > > > > email
> > > > > > > > > > thread. Looking forward to your feedback!
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > best,
> > > > > Zhipeng
> > > > >
> > >
> >
>

Reply via email to