Forgot to say that I agree with Till that it seems a good idea to let TEs register the global partitions to the RM instead of letting JM do it. This simplifies quite a few things.
Thanks, Jiangjie (Becket) Qin On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <becket....@gmail.com> wrote: > Hi Chesnay, > > Thanks for the proposal. My understanding of the entire workflow step by > step is following: > > - JM maintains the local and global partition metadata when the task > runs to create result partitions. The tasks themselves does not distinguish > between local / global partitions. Only the JM knows that. > - JM releases the local partitions as the job executes. When a job > finishes successfully, JM registers the global partitions to the RM. The > global partition IDs are set on the client instead of randomly generated, > so the client can release global partitions using them. (It would be good > to have some human readable string associated with the global result > partitions). > - Client issues REST call to list / release global partitions. > > A few thoughts / questions below: > 1. Failure cases: > * The TEs may remove the result partition while the RM does not > know. In this case, the client will receive a runtime error and submit the > full DAG to recompute the missing result partition. In this case, RM should > release the incomplete global partition. How would RM be notified to do > that? > * Is it possible the RM looses global partition metadata while > the TE still host the data? For example, RM deletes the global partition > entry while the release partition call to TE failed. > * What would happen if the JM fails before the global partitions > are registered to RM? Are users exposed to resource leak if JM does not > have HA? > * What would happen if the RM fails? Will TE release the > partitions by themselves? > > 2. It looks that TE should be the source of truth of the result partition > existence. Does it have to distinguish between global and local result > partitions? If TE does not need to distinguish them, it seems the the > releasePartition() method in TE could just provide the list of partitions > to release, without the partitions to promote. > > 3. In the current design, RM should be able to release result > partitions using ShuffleService. Will RM do this by sending RPC to the TEs? > Or will the RM do it by itself? > > 4. How do we plan to handle the case when there are different shuffle > services in the same Flink cluster? For example, a shared standalone > cluster. > > 5. Minor: usually REST API uses `?` to pass the parameters. Is there a > reason we use `:` instead? > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Sep 17, 2019 at 3:22 AM zhijiang > <wangzhijiang...@aliyun.com.invalid> wrote: > >> >> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my >> side. >> >> I also have some similar concerns which Till already proposed before. >> >> 1. The consistent terminology in different components. On JM side, >> PartitionTracker#getPersistedBlockingPartitions is defined for getting >> global partitions. And on RM side, we define the method of >> #registerGlobalPartitions correspondingly for handover the partitions from >> JM. I think it is better to unify the term in different components for for >> better understanding the semantic. Concering whether to use global or >> persistent, I prefer the "global" term personally. Because it describes the >> scope of partition clearly, and the "persistent" is more like the partition >> storing way or implementation detail. In other words, the global partition >> might also be cached in memory of TE, not must persist into files from >> semantic requirements. Whether memory or persistent file is just the >> implementation choice. >> >> 2. On TE side, we might rename the method #releasePartitions to >> #releaseOrPromotePartitions which describes the function precisely and >> keeps consistent with >> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). >> >> 3. Very agree with Till's suggestions of global PartitionTable on TE side >> and sticking to TE's heartbeat report to RM for global partitions. >> >> 4. Considering ShuffleMaster, it was built inside JM and expected to >> interactive with JM before. Now the RM also needs to interactive with >> ShuffleMaster to release global partitions. Then it might be better to move >> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be >> consistent with RM. >> >> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global >> partitions for successful jobs" >> >> Best, >> Zhijiang >> >> >> ------------------------------------------------------------------ >> From:Till Rohrmann <trohrm...@apache.org> >> Send Time:2019年9月10日(星期二) 10:10 >> To:dev <dev@flink.apache.org> >> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle >> >> Thanks Chesnay for drafting the FLIP and starting this discussion. >> >> I have a couple of comments: >> >> * I know that I've also coined the terms global/local result partition but >> maybe it is not the perfect name. Maybe we could rethink the terminology >> and call them persistent result partitions? >> * Nit: I would call the last parameter of void releasePartitions(JobID >> jobId, Collection<ResultPartitionID> partitionsToRelease, >> Collection<ResultPartitionID> partitionsToPromote) either >> partitionsToRetain or partitionsToPersistent. >> * I'm not sure whether partitionsToRelease should contain a >> global/persistent result partition id. I always thought that the user will >> be responsible for managing the lifecycle of a global/persistent >> result partition. >> * Instead of extending the PartitionTable to be able to store >> global/persistent and local/transient result partitions, I would rather >> introduce a global PartitionTable to store the global/persistent result >> partitions explicitly. I think there is a benefit in making things as >> explicit as possible. >> * The handover logic between the JM and the RM for the global/persistent >> result partitions seems a bit brittle to me. What will happen if the JM >> cannot reach the RM? I think it would be better if the TM announces the >> global/persistent result partitions to the RM via its heartbeats. That way >> we don't rely on an established connection between the JM and RM and we >> keep the TM as the ground of truth. Moreover, the RM should simply forward >> the release calls to the TM without much internal logic. >> >> Cheers, >> Till >> >> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <ches...@apache.org> >> wrote: >> >> > Hello, >> > >> > FLIP-36 (interactive programming) >> > < >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink >> > >> > >> > proposes a new programming paradigm where jobs are built incrementally >> > by the user. >> > >> > To support this in an efficient manner I propose to extend partition >> > life-cycle to support the notion of /global partitions/, which are >> > partitions that can exist beyond the life-time of a job. >> > >> > These partitions could then be re-used by subsequent jobs in a fairly >> > efficient manner, as they don't have to persisted to an external storage >> > first and consuming tasks could be scheduled to exploit data-locality. >> > >> > The FLIP outlines the required changes on the JobMaster, TaskExecutor >> > and ResourceManager to support this from a life-cycle perspective. >> > >> > This FLIP does /not/ concern itself with the /usage/ of global >> > partitions, including client-side APIs, job-submission, scheduling and >> > reading said partitions; these are all follow-ups that will either be >> > part of FLIP-36 or spliced out into separate FLIPs. >> > >> > >> >>