Hi Chesnay, Thanks for the proposal. My understanding of the entire workflow step by step is following:
- JM maintains the local and global partition metadata when the task runs to create result partitions. The tasks themselves does not distinguish between local / global partitions. Only the JM knows that. - JM releases the local partitions as the job executes. When a job finishes successfully, JM registers the global partitions to the RM. The global partition IDs are set on the client instead of randomly generated, so the client can release global partitions using them. (It would be good to have some human readable string associated with the global result partitions). - Client issues REST call to list / release global partitions. A few thoughts / questions below: 1. Failure cases: * The TEs may remove the result partition while the RM does not know. In this case, the client will receive a runtime error and submit the full DAG to recompute the missing result partition. In this case, RM should release the incomplete global partition. How would RM be notified to do that? * Is it possible the RM looses global partition metadata while the TE still host the data? For example, RM deletes the global partition entry while the release partition call to TE failed. * What would happen if the JM fails before the global partitions are registered to RM? Are users exposed to resource leak if JM does not have HA? * What would happen if the RM fails? Will TE release the partitions by themselves? 2. It looks that TE should be the source of truth of the result partition existence. Does it have to distinguish between global and local result partitions? If TE does not need to distinguish them, it seems the the releasePartition() method in TE could just provide the list of partitions to release, without the partitions to promote. 3. In the current design, RM should be able to release result partitions using ShuffleService. Will RM do this by sending RPC to the TEs? Or will the RM do it by itself? 4. How do we plan to handle the case when there are different shuffle services in the same Flink cluster? For example, a shared standalone cluster. 5. Minor: usually REST API uses `?` to pass the parameters. Is there a reason we use `:` instead? Thanks, Jiangjie (Becket) Qin On Tue, Sep 17, 2019 at 3:22 AM zhijiang <wangzhijiang...@aliyun.com.invalid> wrote: > > Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my > side. > > I also have some similar concerns which Till already proposed before. > > 1. The consistent terminology in different components. On JM side, > PartitionTracker#getPersistedBlockingPartitions is defined for getting > global partitions. And on RM side, we define the method of > #registerGlobalPartitions correspondingly for handover the partitions from > JM. I think it is better to unify the term in different components for for > better understanding the semantic. Concering whether to use global or > persistent, I prefer the "global" term personally. Because it describes the > scope of partition clearly, and the "persistent" is more like the partition > storing way or implementation detail. In other words, the global partition > might also be cached in memory of TE, not must persist into files from > semantic requirements. Whether memory or persistent file is just the > implementation choice. > > 2. On TE side, we might rename the method #releasePartitions to > #releaseOrPromotePartitions which describes the function precisely and > keeps consistent with > PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). > > 3. Very agree with Till's suggestions of global PartitionTable on TE side > and sticking to TE's heartbeat report to RM for global partitions. > > 4. Considering ShuffleMaster, it was built inside JM and expected to > interactive with JM before. Now the RM also needs to interactive with > ShuffleMaster to release global partitions. Then it might be better to move > ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be > consistent with RM. > > 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global > partitions for successful jobs" > > Best, > Zhijiang > > > ------------------------------------------------------------------ > From:Till Rohrmann <trohrm...@apache.org> > Send Time:2019年9月10日(星期二) 10:10 > To:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > > Thanks Chesnay for drafting the FLIP and starting this discussion. > > I have a couple of comments: > > * I know that I've also coined the terms global/local result partition but > maybe it is not the perfect name. Maybe we could rethink the terminology > and call them persistent result partitions? > * Nit: I would call the last parameter of void releasePartitions(JobID > jobId, Collection<ResultPartitionID> partitionsToRelease, > Collection<ResultPartitionID> partitionsToPromote) either > partitionsToRetain or partitionsToPersistent. > * I'm not sure whether partitionsToRelease should contain a > global/persistent result partition id. I always thought that the user will > be responsible for managing the lifecycle of a global/persistent > result partition. > * Instead of extending the PartitionTable to be able to store > global/persistent and local/transient result partitions, I would rather > introduce a global PartitionTable to store the global/persistent result > partitions explicitly. I think there is a benefit in making things as > explicit as possible. > * The handover logic between the JM and the RM for the global/persistent > result partitions seems a bit brittle to me. What will happen if the JM > cannot reach the RM? I think it would be better if the TM announces the > global/persistent result partitions to the RM via its heartbeats. That way > we don't rely on an established connection between the JM and RM and we > keep the TM as the ground of truth. Moreover, the RM should simply forward > the release calls to the TM without much internal logic. > > Cheers, > Till > > On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <ches...@apache.org> > wrote: > > > Hello, > > > > FLIP-36 (interactive programming) > > < > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > > > > > proposes a new programming paradigm where jobs are built incrementally > > by the user. > > > > To support this in an efficient manner I propose to extend partition > > life-cycle to support the notion of /global partitions/, which are > > partitions that can exist beyond the life-time of a job. > > > > These partitions could then be re-used by subsequent jobs in a fairly > > efficient manner, as they don't have to persisted to an external storage > > first and consuming tasks could be scheduled to exploit data-locality. > > > > The FLIP outlines the required changes on the JobMaster, TaskExecutor > > and ResourceManager to support this from a life-cycle perspective. > > > > This FLIP does /not/ concern itself with the /usage/ of global > > partitions, including client-side APIs, job-submission, scheduling and > > reading said partitions; these are all follow-ups that will either be > > part of FLIP-36 or spliced out into separate FLIPs. > > > > > >