Hi Yun, Thanks for your feedback!
1. With the cached stream the compile and job submission happens as a regular job submission. And a job with multiple concurrent cached DataStream is supported. For your example, a and b are run in the same job. Thus, all the cached DataStream are created when the job is completed. 2. If I understand your question correctly, this wouldn’t be a problem if we support concurrent cached DataStream in a job. 3. Yes, the execution would have the same compile result regardless of what deployment mode it is. If the user tries to run multiple batch job that uses cache in one StreamExecutionEnvironment with Per-job deployment mode. The cache-consuming job will fail and we go through the failover procedure to re-submit the job with the original as if the cache hasn’t been created. We can do better if we know what deployment mode upfront and disable the caching for Per-job mode. Maybe we can check the `execution.target` option to see if it is Per-Job mode. What do you think? 4. This is a good question. And I can imagine a use case where users want to process some bounded sources and cache the intermediate result, verify the result, and then use it later for a Stream job. Batch mode is required when creating the cache so that we know the job will finish and the cache can be reused. When consuming the cache, it could be in either Batch mode or Stream mode. For stream mode, it behaves differently when the cache Datastream hasn't been created or is invalid. It should compute the intermediate result from scratch but it should not cache the intermediate result. For remote shuffle service, I think it is fine if the current design is aligned with remote shuffle service. For any work that is required for remote shuffle service to work with caching, I am more than happy to help. Best, Xuannan On Wed, Jan 5, 2022 at 4:49 PM Yun Gao <yungao...@aliyun.com.invalid> wrote: > > Hi Xuannan, > > Very thanks for drafting the FLIP and initiating the discussion! > > I have several issues, sorry if I have misunderstandings: > > 1. With the cached stream, when would the compile and job submission > happens? Does it happen on calling execute_and_cache() ? If so, could > we support the job with multiple concurrent cached stream like > > DataStream a = ... > DataStream b = ... > a.cache() > b.cache() > // could we run a/b in a same job ? > > If not, perhaps we might have part of graphs that would not be executed? > > 2. If the part of graphs using the cache partition is executed as a second > job, would the job be executed after its precedent jobs get finished? > Would the StreamExecutionEnviornment does this tracking? > > 3. Do the execution would have the same compile result when running on > per-job v.s. application / session mode ? Since for per-job mode, when > executing the part of the graph that using the cached result, we might need > to run the whole graph from the sources; but for application / session mode, > it would be > compiled to a separate job reading from the cached result partitions. If the > compile result is different, perhaps currently we could not get the execution > mode when compiling ? > > 4. For the part of graph using the cached result, do we support the stream > case? Like we have a part of graph that have two sources, one source is a > cached result partition and the other one is an unbounded job. > > For remote shuffle service, It seems to me currently we do not have > complete process for them to support the cache ResultPartition, since > in JobMasterPartitionTrackerImpl we have not support prompt a result > partition via pluggable ShuffleMaster yet. But we should be able to further > complete this part. > > Best, > Yun > > > ------------------------------------------------------------------ > From:Xuannan Su <suxuanna...@gmail.com> > Send Time:2022 Jan. 5 (Wed.) 14:04 > To:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch > Processing > > Hi David, > > We have a description in the FLIP about the case of TM failure without > the remote shuffle service. Basically, since the partitions are stored > at the TM, a TM failure requires recomputing the intermediate result. > > If a Flink job uses the remote shuffle service, the partitions are > stored at the remote shuffle service. In this case, the failure of TM > will not cause any partition loss. Therefore, recomputing the > intermediate result is not required. In case of partition lost at the > remote shuffle service, even without a TM failure, the cached > intermediate result is not valid anymore, so the intermediate result > has to be recomputed. > > To summarize, no matter where the partitions are stored, locally at TM > or remotely at remote shuffle service, recomputing is only required if > the consuming job finds some partitions lost. > > I updated the FLIP to include the description of failover when using > remote shuffle service. > > Best, > Xuannan > > > On Mon, Jan 3, 2022 at 4:17 PM David Morávek <d...@apache.org> wrote: > > > > One more question from my side, should we make sure this plays well with > > the remote shuffle service [1] in case of TM failure? > > > > [1] https://github.com/flink-extended/flink-remote-shuffle > > > > D. > > > > On Thu, Dec 30, 2021 at 11:59 AM Gen Luo <luogen...@gmail.com> wrote: > > > > > Hi Xuannan, > > > > > > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table > > > storage, which provides a unified changelog & table representation. Tables > > > stored there can be used in further ad-hoc queries. To my understanding, > > > it's quite like an implementation of caching in Table API, and the ad-hoc > > > queries are somehow like further steps in an interactive program. > > > > > > As you replied, caching at Table/SQL API is the next step, as a part of > > > interactive programming in Table API, which we all agree is the major > > > scenario. What do you think about the relation between it and FLIP-188? > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage > > > > > > > > > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su <suxuanna...@gmail.com> wrote: > > > > > > > Hi David, > > > > > > > > Thanks for sharing your thoughts. > > > > > > > > You are right that most people tend to use high-level API for > > > > interactive data exploration. Actually, there is > > > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I > > > > know, it has been accepted but hasn’t been implemented. At the time > > > > when it is drafted, DataStream did not support Batch mode but Table > > > > API does. > > > > > > > > Now that the DataStream API does support batch processing, I think we > > > > can focus on supporting cache at DataStream first. It is still > > > > valuable for DataStream users and most of the work we do in this FLIP > > > > can be reused. So I want to limit the scope of this FLIP. > > > > > > > > After caching is supported at DataStream, we can continue from where > > > > FLIP-36 left off to support caching at Table/SQL API. We might have to > > > > re-vote FLIP-36 or draft a new FLIP. What do you think? > > > > > > > > Best, > > > > Xuannan > > > > > > > > [1] > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > > > > > > > > > > > > > > On Wed, Dec 29, 2021 at 6:08 PM David Morávek <d...@apache.org> wrote: > > > > > > > > > > Hi Xuannan, > > > > > > > > > > thanks for drafting this FLIP. > > > > > > > > > > One immediate thought, from what I've seen for interactive data > > > > exploration > > > > > with Spark, most people tend to use the higher level APIs, that allow > > > for > > > > > faster prototyping (Table API in Flink's case). Should the Table API > > > also > > > > > be covered by this FLIP? > > > > > > > > > > Best, > > > > > D. > > > > > > > > > > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su <suxuanna...@gmail.com> > > > > wrote: > > > > > > > > > > > Hi devs, > > > > > > > > > > > > I’d like to start a discussion about adding support to cache the > > > > > > intermediate result at DataStream API for batch processing. > > > > > > > > > > > > As the DataStream API now supports batch execution mode, we see > > > > > > users > > > > > > using the DataStream API to run batch jobs. Interactive programming > > > is > > > > > > an important use case of Flink batch processing. And the ability to > > > > > > cache intermediate results of a DataStream is crucial to the > > > > > > interactive programming experience. > > > > > > > > > > > > Therefore, we propose to support caching a DataStream in Batch > > > > > > execution. We believe that users can benefit a lot from the change > > > and > > > > > > encourage them to use DataStream API for their interactive batch > > > > > > processing work. > > > > > > > > > > > > Please check out the FLIP-205 [1] and feel free to reply to this > > > email > > > > > > thread. Looking forward to your feedback! > > > > > > > > > > > > [1] > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing > > > > > > > > > > > > Best, > > > > > > Xuannan > > > > > > > > > > > > >