Thanks for addressing our comments Chesnay. See some comments inline. On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <ches...@apache.org> wrote:
> Thank you for your comments; I've aggregated them a bit and added > comments to each of them. > > 1) Concept name (proposal: persistent) > > I agree that "global" is rather undescriptive, particularly so since we > never had a notion of "local" partitions. > I'm not a fan of "persistent"; as to me this always implies reliable > long-term storage which as I understand we aren't shooting for here. > > I was thinking of "cached" partitions. > > To Zhijiangs point, we should of course make the naming consistent > everywhere. > > 2) Naming of last parameter of TE#releasePartitions (proposal: > partitionsToRetain / partitionsToPersistent) > > I can see where you're coming from ("promote" is somewhat abstract), but > I think both suggestions have downsides. > > "partitionsToPersistent" to me implies an additional write operation to > somewhere, but we aren't doing that. > "partitionsToRetain" kind of results in a redundancy with the other > argument since retaining is the opposite to releasing a partition; if I > want to retain a partition, why am I not just excluding it from the set > to release? > > I quite like "promote" personally; we fundamentally change how the > lifecycle for these partitions work, and introducing new keywords isn't > a inherently a bad choice. > > 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; > Note: addition of "OrPromote" is dependent on 2) ) > > Good point. > > 4) /Till: I'm not sure whether partitionsToRelease should contain a// > //global/persistent result partition id. I always thought that the user > will// > //be responsible for managing the lifecycle of a global/persistent// > //result partition./ > > @Till Please elaborate; which method/argument are you referring to? > In the FLIP you wrote "The set of partitions to release may contain local and/or global partitions; the promotion set must only refer to local partitions." to describe the `releasePartitions`. I think the JM should never be in the situation to release a global partition. Moreover, I believe we should have a separate RPC to release global result partitions which might come from the RM. > > 4)/Dedicated PartitionTable for global partitions/ > > Since there is only one RM for each TE a PartitionTable is unnecessary; > a simple set will suffice. > Alternatively, we could introduce such a dedicated set into the > PartitionTable to keep these data-structures close. > > 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs > retain global partitions for successful jobs"/ > > Will fix it. > > 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and > expected to interactive with JM before. Now the RM also needs to > interactive with ShuffleMaster to release global partitions. Then it > might be better to move ShuffleMaster outside of JM, and the lifecycle > of ShuffleMaster should be consistent with RM./ > > Yes, I alluded to this in the FLIP but should've been more explicit; the > shuffle master must outlive the JM. This is somewhat tricky when > considering the future a bit; if we assume that different jobs or even a > single one can use different shuffle services, then we need a way to > associate the partitions with the corresponding shuffle master. This > will likely require the introduction of a ShuffleMasterID that is > included in the ShuffleDescriptor. > > 7) Handover > > /Till: The handover logic between the JM and the RM for the > global/persistent// > //result partitions seems a bit brittle to me. What will happen if the JM// > //cannot reach the RM? I think it would be better if the TM announces the// > //global/persistent result partitions to the RM via its heartbeats. That > way// > //we don't rely on an established connection between the JM and RM and we// > //keep the TM as the ground of truth. Moreover, the RM should simply > forward// > //the release calls to the TM without much internal logic./ > > As for your question, if the JM cannot reach the RM the handover will > fail, the JM will likely shutdown without promoting any partition and > the TE will release all partitions. > What is the defined behavior for the JM in case of the RM disconnect > after a job has finished? Does it always/sometimes/never shutdown > with/-out communicating the result to the client / updating HA data; > or simply put, does the JM behave to the user as if nothing has happened > in all cases? > Once the JM has obtained the required slots to run a job, it no longer needs to communicate with the RM. Hence, a lost RM connection won't interfere with the job. I would like to keep it like this by letting the TE announce global result partitions to the RM and not to introduce another communication roundtrip. > > A heartbeat-based approach is useful and can alleviate some failure > cases (see below); but we need to make sure we don't exceed the akka > framesize or otherwise interfere with the heartbeat mechanism (like we > did with metrics in the past). Ideally we would only submit updates to > the partition set (added/removed partitions), but I'm not sure if the > heartbeats are reliable enough for this to work. > How big do you expect the payload to become? > > 8. Failure cases: > /Becket:/ > /a) The TEs may remove the result partition while the RM does not// > //know. In this case, the client will receive a runtime error and submit > the// > //full DAG to recompute the missing result partition. In this case, RM > should// > //release the incomplete global partition. How would RM be notified to do// > //that?// > //b) Is it possible the RM looses global partition metadata while// > //the TE still host the data? For example, RM deletes the global > partition// > //entry while the release partition call to TE failed.// > //c) What would happen if the JM fails before the global partitions// > //are registered to RM? Are users exposed to resource leak if JM does not// > //have HA?// > //d) What would happen if the RM fails? Will TE release the// > //partitions by themselves?/ > > 1.a) This is a good question that I haven't considered. This will likely > require a heartbeat-like report of available partitions. > The hearbeat based synchronization approach seems to crystalize as the way to go forward with this FLIP. > 1.b) RM should only delete entries if it received an ack from the TE; > otherwise we could easily end up leaking partitions. I believe I forgot > writing this down. > 1.c) As described in the FLIP the handoff to the RM must occur before > partitions are promoted. > If the JM fails during the handoff then the TE will cleanup all > partitions since it lost the connection to the JM, and partitions > weren't promoted yet. > If the JM fails after the handoff but before the promotion, same as > above. The RM would contain invalid entries in this case; see 1.a) . > If the JM fails after the handoff and promotion partitions we don't > leak anything since the RM is now fully responsible. > 1.d) yes; if the connection to the RM is disrupted the TE will cleanup > all global partitions, similar to how it cleans up all partitions > associated with a given job if the connection to the corresponding JM is > disrupted. > > 9. /Becket: It looks that TE should be the source of truth of the result > partition// > //existence. Does it have to distinguish between global and local result// > //partitions? If TE does not need to distinguish them, it seems the the// > //releasePartition() method in TE could just provide the list of > partitions// > //to release, without the partitions to promote./ > > The promotion is a hard requirement, as this is the signal to the TE > that this partition is no longer bound to the life-cycle of a job. > Without the promotion the TE would delete the partition once the JM has > shutdown; this is a safety net to ensure cleanup of partitions in case > of a disconnect. > > 10. /In the current design, RM should be able to release result// > //partitions using ShuffleService. Will RM do this by sending RPC to the > TEs?// > //Or will the RM do it by itself?/ > > The RM will send a release call to each TM and issue a release call to > the ShuffleMaster, just like the JobMaster handles partition releases. > > 11. /Becket: How do we plan to handle the case when there are different > shuffle// > //services in the same Flink cluster? For example, a shared standalone// > //cluster./ > > This case is not considered; there are so many changes necessary in > other parts of the runtime that we would jump the gun in addressing it > here. > Ultimately though, I would think that that the addition of a shuffle > master instance ID and shuffle service identifier should suffice. > > The identifier is used in subsequent jobs to load the appropriate > shuffle service for a given partition (think of it like a class name), > while the shuffle master instance ID is used to differentiate between > the different shuffle master instances running in the cluster (which > partitions have to be associated with so we can issue the correct > release calls). > > 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is > there a// > //reason we use `:` instead?/ > > That's netty syntax for path parameters. > > On 30/09/2019 08:34, Becket Qin wrote: > > Forgot to say that I agree with Till that it seems a good idea to let TEs > > register the global partitions to the RM instead of letting JM do it. > This > > simplifies quite a few things. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <becket....@gmail.com> > wrote: > > > >> Hi Chesnay, > >> > >> Thanks for the proposal. My understanding of the entire workflow step by > >> step is following: > >> > >> - JM maintains the local and global partition metadata when the task > >> runs to create result partitions. The tasks themselves does not > distinguish > >> between local / global partitions. Only the JM knows that. > >> - JM releases the local partitions as the job executes. When a job > >> finishes successfully, JM registers the global partitions to the RM. The > >> global partition IDs are set on the client instead of randomly > generated, > >> so the client can release global partitions using them. (It would be > good > >> to have some human readable string associated with the global result > >> partitions). > >> - Client issues REST call to list / release global partitions. > >> > >> A few thoughts / questions below: > >> 1. Failure cases: > >> * The TEs may remove the result partition while the RM does > not > >> know. In this case, the client will receive a runtime error and submit > the > >> full DAG to recompute the missing result partition. In this case, RM > should > >> release the incomplete global partition. How would RM be notified to do > >> that? > >> * Is it possible the RM looses global partition metadata > while > >> the TE still host the data? For example, RM deletes the global partition > >> entry while the release partition call to TE failed. > >> * What would happen if the JM fails before the global > partitions > >> are registered to RM? Are users exposed to resource leak if JM does not > >> have HA? > >> * What would happen if the RM fails? Will TE release the > >> partitions by themselves? > >> > >> 2. It looks that TE should be the source of truth of the result > partition > >> existence. Does it have to distinguish between global and local result > >> partitions? If TE does not need to distinguish them, it seems the the > >> releasePartition() method in TE could just provide the list of > partitions > >> to release, without the partitions to promote. > >> > >> 3. In the current design, RM should be able to release result > >> partitions using ShuffleService. Will RM do this by sending RPC to the > TEs? > >> Or will the RM do it by itself? > >> > >> 4. How do we plan to handle the case when there are different shuffle > >> services in the same Flink cluster? For example, a shared standalone > >> cluster. > >> > >> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a > >> reason we use `:` instead? > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> On Tue, Sep 17, 2019 at 3:22 AM zhijiang > >> <wangzhijiang...@aliyun.com.invalid> wrote: > >> > >>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on > my > >>> side. > >>> > >>> I also have some similar concerns which Till already proposed before. > >>> > >>> 1. The consistent terminology in different components. On JM side, > >>> PartitionTracker#getPersistedBlockingPartitions is defined for getting > >>> global partitions. And on RM side, we define the method of > >>> #registerGlobalPartitions correspondingly for handover the partitions > from > >>> JM. I think it is better to unify the term in different components for > for > >>> better understanding the semantic. Concering whether to use global or > >>> persistent, I prefer the "global" term personally. Because it > describes the > >>> scope of partition clearly, and the "persistent" is more like the > partition > >>> storing way or implementation detail. In other words, the global > partition > >>> might also be cached in memory of TE, not must persist into files from > >>> semantic requirements. Whether memory or persistent file is just the > >>> implementation choice. > >>> > >>> 2. On TE side, we might rename the method #releasePartitions to > >>> #releaseOrPromotePartitions which describes the function precisely and > >>> keeps consistent with > >>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). > >>> > >>> 3. Very agree with Till's suggestions of global PartitionTable on TE > side > >>> and sticking to TE's heartbeat report to RM for global partitions. > >>> > >>> 4. Considering ShuffleMaster, it was built inside JM and expected to > >>> interactive with JM before. Now the RM also needs to interactive with > >>> ShuffleMaster to release global partitions. Then it might be better to > move > >>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should > be > >>> consistent with RM. > >>> > >>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global > >>> partitions for successful jobs" > >>> > >>> Best, > >>> Zhijiang > >>> > >>> > >>> ------------------------------------------------------------------ > >>> From:Till Rohrmann <trohrm...@apache.org> > >>> Send Time:2019年9月10日(星期二) 10:10 > >>> To:dev <dev@flink.apache.org> > >>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > >>> > >>> Thanks Chesnay for drafting the FLIP and starting this discussion. > >>> > >>> I have a couple of comments: > >>> > >>> * I know that I've also coined the terms global/local result partition > but > >>> maybe it is not the perfect name. Maybe we could rethink the > terminology > >>> and call them persistent result partitions? > >>> * Nit: I would call the last parameter of void releasePartitions(JobID > >>> jobId, Collection<ResultPartitionID> partitionsToRelease, > >>> Collection<ResultPartitionID> partitionsToPromote) either > >>> partitionsToRetain or partitionsToPersistent. > >>> * I'm not sure whether partitionsToRelease should contain a > >>> global/persistent result partition id. I always thought that the user > will > >>> be responsible for managing the lifecycle of a global/persistent > >>> result partition. > >>> * Instead of extending the PartitionTable to be able to store > >>> global/persistent and local/transient result partitions, I would rather > >>> introduce a global PartitionTable to store the global/persistent result > >>> partitions explicitly. I think there is a benefit in making things as > >>> explicit as possible. > >>> * The handover logic between the JM and the RM for the > global/persistent > >>> result partitions seems a bit brittle to me. What will happen if the JM > >>> cannot reach the RM? I think it would be better if the TM announces the > >>> global/persistent result partitions to the RM via its heartbeats. That > way > >>> we don't rely on an established connection between the JM and RM and we > >>> keep the TM as the ground of truth. Moreover, the RM should simply > forward > >>> the release calls to the TM without much internal logic. > >>> > >>> Cheers, > >>> Till > >>> > >>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <ches...@apache.org> > >>> wrote: > >>> > >>>> Hello, > >>>> > >>>> FLIP-36 (interactive programming) > >>>> < > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > >>>> > >>>> proposes a new programming paradigm where jobs are built incrementally > >>>> by the user. > >>>> > >>>> To support this in an efficient manner I propose to extend partition > >>>> life-cycle to support the notion of /global partitions/, which are > >>>> partitions that can exist beyond the life-time of a job. > >>>> > >>>> These partitions could then be re-used by subsequent jobs in a fairly > >>>> efficient manner, as they don't have to persisted to an external > storage > >>>> first and consuming tasks could be scheduled to exploit data-locality. > >>>> > >>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor > >>>> and ResourceManager to support this from a life-cycle perspective. > >>>> > >>>> This FLIP does /not/ concern itself with the /usage/ of global > >>>> partitions, including client-side APIs, job-submission, scheduling and > >>>> reading said partitions; these are all follow-ups that will either be > >>>> part of FLIP-36 or spliced out into separate FLIPs. > >>>> > >>>> > >>> > >