Hi Xuannan, Very thanks for drafting the FLIP and initiating the discussion!
I have several issues, sorry if I have misunderstandings: 1. With the cached stream, when would the compile and job submission happens? Does it happen on calling execute_and_cache() ? If so, could we support the job with multiple concurrent cached stream like DataStream a = ... DataStream b = ... a.cache() b.cache() // could we run a/b in a same job ? If not, perhaps we might have part of graphs that would not be executed? 2. If the part of graphs using the cache partition is executed as a second job, would the job be executed after its precedent jobs get finished? Would the StreamExecutionEnviornment does this tracking? 3. Do the execution would have the same compile result when running on per-job v.s. application / session mode ? Since for per-job mode, when executing the part of the graph that using the cached result, we might need to run the whole graph from the sources; but for application / session mode, it would be compiled to a separate job reading from the cached result partitions. If the compile result is different, perhaps currently we could not get the execution mode when compiling ? 4. For the part of graph using the cached result, do we support the stream case? Like we have a part of graph that have two sources, one source is a cached result partition and the other one is an unbounded job. For remote shuffle service, It seems to me currently we do not have complete process for them to support the cache ResultPartition, since in JobMasterPartitionTrackerImpl we have not support prompt a result partition via pluggable ShuffleMaster yet. But we should be able to further complete this part. Best, Yun ------------------------------------------------------------------ From:Xuannan Su <suxuanna...@gmail.com> Send Time:2022 Jan. 5 (Wed.) 14:04 To:dev <dev@flink.apache.org> Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing Hi David, We have a description in the FLIP about the case of TM failure without the remote shuffle service. Basically, since the partitions are stored at the TM, a TM failure requires recomputing the intermediate result. If a Flink job uses the remote shuffle service, the partitions are stored at the remote shuffle service. In this case, the failure of TM will not cause any partition loss. Therefore, recomputing the intermediate result is not required. In case of partition lost at the remote shuffle service, even without a TM failure, the cached intermediate result is not valid anymore, so the intermediate result has to be recomputed. To summarize, no matter where the partitions are stored, locally at TM or remotely at remote shuffle service, recomputing is only required if the consuming job finds some partitions lost. I updated the FLIP to include the description of failover when using remote shuffle service. Best, Xuannan On Mon, Jan 3, 2022 at 4:17 PM David Morávek <d...@apache.org> wrote: > > One more question from my side, should we make sure this plays well with > the remote shuffle service [1] in case of TM failure? > > [1] https://github.com/flink-extended/flink-remote-shuffle > > D. > > On Thu, Dec 30, 2021 at 11:59 AM Gen Luo <luogen...@gmail.com> wrote: > > > Hi Xuannan, > > > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table > > storage, which provides a unified changelog & table representation. Tables > > stored there can be used in further ad-hoc queries. To my understanding, > > it's quite like an implementation of caching in Table API, and the ad-hoc > > queries are somehow like further steps in an interactive program. > > > > As you replied, caching at Table/SQL API is the next step, as a part of > > interactive programming in Table API, which we all agree is the major > > scenario. What do you think about the relation between it and FLIP-188? > > > > [1] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <suxuanna...@gmail.com> wrote: > > > > > Hi David, > > > > > > Thanks for sharing your thoughts. > > > > > > You are right that most people tend to use high-level API for > > > interactive data exploration. Actually, there is > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I > > > know, it has been accepted but hasn’t been implemented. At the time > > > when it is drafted, DataStream did not support Batch mode but Table > > > API does. > > > > > > Now that the DataStream API does support batch processing, I think we > > > can focus on supporting cache at DataStream first. It is still > > > valuable for DataStream users and most of the work we do in this FLIP > > > can be reused. So I want to limit the scope of this FLIP. > > > > > > After caching is supported at DataStream, we can continue from where > > > FLIP-36 left off to support caching at Table/SQL API. We might have to > > > re-vote FLIP-36 or draft a new FLIP. What do you think? > > > > > > Best, > > > Xuannan > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > > > > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <d...@apache.org> wrote: > > > > > > > > Hi Xuannan, > > > > > > > > thanks for drafting this FLIP. > > > > > > > > One immediate thought, from what I've seen for interactive data > > > exploration > > > > with Spark, most people tend to use the higher level APIs, that allow > > for > > > > faster prototyping (Table API in Flink's case). Should the Table API > > also > > > > be covered by this FLIP? > > > > > > > > Best, > > > > D. > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <suxuanna...@gmail.com> > > > wrote: > > > > > > > > > Hi devs, > > > > > > > > > > I’d like to start a discussion about adding support to cache the > > > > > intermediate result at DataStream API for batch processing. > > > > > > > > > > As the DataStream API now supports batch execution mode, we see users > > > > > using the DataStream API to run batch jobs. Interactive programming > > is > > > > > an important use case of Flink batch processing. And the ability to > > > > > cache intermediate results of a DataStream is crucial to the > > > > > interactive programming experience. > > > > > > > > > > Therefore, we propose to support caching a DataStream in Batch > > > > > execution. We believe that users can benefit a lot from the change > > and > > > > > encourage them to use DataStream API for their interactive batch > > > > > processing work. > > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to this > > email > > > > > thread. Looking forward to your feedback! > > > > > > > > > > [1] > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing > > > > > > > > > > Best, > > > > > Xuannan > > > > > > > > > >