On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler <ches...@apache.org> wrote:
> *Till: In the FLIP you wrote "The set of partitions to release may contain > local > and/or global partitions; the promotion set must only refer to local > partitions." to describe the `releasePartitions`. I think the JM should > never be in the situation to release a global partition. Moreover, I > believe we should have a separate RPC to release global result partitions > which might come from the RM.* > > We can certainly add a separate RPC method for explicitly releasing global > partitions. > You are correct that the JM should not be able to release those, just like > the RM should not be able to release non-global partitions. > *Till: Once the JM has obtained the required slots to run a job, it no longer > needs to communicate with the RM. Hence, a lost RM connection won't > interfere with the job. I would like to keep it like this by letting the TE > announce global result partitions to the RM and not to introduce another > communication roundtrip. > > *Agreed, this is a nice property to retain. > *Till: How big do you expect the payload to become? > > *I don't know, which is precisely why I want to be cautious about it. > The last time I made a similar assumption I didn't expect anyone to have > hundreds of thousands of metrics on a single TM, which turned out to be wrong. > I wouldn't exclude the possibility of a similar number of partitions being > hosted on a single TE. > > > One problem we have to solve with the heartbeat-based approach is that > partitions may be lost without the TE noticing, due to disk-failures or > external delete operations. > Currently, for scheduling purposes we rely on information stored in the JM, > and update said information if a job fails due to a missing partition. > However, IIRC the JM is informed about with an exception that is thrown by > the consumer of said partition, not the producer. As far as the producing TM > is concerned, it is still hosting that partition. > This means we have to forward errors for missing partitions from the network > stack on the producers side to the TE, so that it can inform the RM about it. > > Yes, I think you are right Chesnay. This would also be a good addition for the local result partitions. Cheers, Till > On 02/10/2019 16:21, Till Rohrmann wrote: > > Thanks for addressing our comments Chesnay. See some comments inline. > > On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <ches...@apache.org> > <ches...@apache.org> wrote: > > > Thank you for your comments; I've aggregated them a bit and added > comments to each of them. > > 1) Concept name (proposal: persistent) > > I agree that "global" is rather undescriptive, particularly so since we > never had a notion of "local" partitions. > I'm not a fan of "persistent"; as to me this always implies reliable > long-term storage which as I understand we aren't shooting for here. > > I was thinking of "cached" partitions. > > To Zhijiangs point, we should of course make the naming consistent > everywhere. > > 2) Naming of last parameter of TE#releasePartitions (proposal: > partitionsToRetain / partitionsToPersistent) > > I can see where you're coming from ("promote" is somewhat abstract), but > I think both suggestions have downsides. > > "partitionsToPersistent" to me implies an additional write operation to > somewhere, but we aren't doing that. > "partitionsToRetain" kind of results in a redundancy with the other > argument since retaining is the opposite to releasing a partition; if I > want to retain a partition, why am I not just excluding it from the set > to release? > > I quite like "promote" personally; we fundamentally change how the > lifecycle for these partitions work, and introducing new keywords isn't > a inherently a bad choice. > > 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions; > Note: addition of "OrPromote" is dependent on 2) ) > > Good point. > > 4) /Till: I'm not sure whether partitionsToRelease should contain a// > //global/persistent result partition id. I always thought that the user > will// > //be responsible for managing the lifecycle of a global/persistent// > //result partition./ > > @Till Please elaborate; which method/argument are you referring to? > > > In the FLIP you wrote "The set of partitions to release may contain local > and/or global partitions; the promotion set must only refer to local > partitions." to describe the `releasePartitions`. I think the JM should > never be in the situation to release a global partition. Moreover, I > believe we should have a separate RPC to release global result partitions > which might come from the RM. > > > 4)/Dedicated PartitionTable for global partitions/ > > Since there is only one RM for each TE a PartitionTable is unnecessary; > a simple set will suffice. > Alternatively, we could introduce such a dedicated set into the > PartitionTable to keep these data-structures close. > > 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs > retain global partitions for successful jobs"/ > > Will fix it. > > 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and > expected to interactive with JM before. Now the RM also needs to > interactive with ShuffleMaster to release global partitions. Then it > might be better to move ShuffleMaster outside of JM, and the lifecycle > of ShuffleMaster should be consistent with RM./ > > Yes, I alluded to this in the FLIP but should've been more explicit; the > shuffle master must outlive the JM. This is somewhat tricky when > considering the future a bit; if we assume that different jobs or even a > single one can use different shuffle services, then we need a way to > associate the partitions with the corresponding shuffle master. This > will likely require the introduction of a ShuffleMasterID that is > included in the ShuffleDescriptor. > > 7) Handover > > /Till: The handover logic between the JM and the RM for the > global/persistent// > //result partitions seems a bit brittle to me. What will happen if the JM// > //cannot reach the RM? I think it would be better if the TM announces the// > //global/persistent result partitions to the RM via its heartbeats. That > way// > //we don't rely on an established connection between the JM and RM and we// > //keep the TM as the ground of truth. Moreover, the RM should simply > forward// > //the release calls to the TM without much internal logic./ > > As for your question, if the JM cannot reach the RM the handover will > fail, the JM will likely shutdown without promoting any partition and > the TE will release all partitions. > What is the defined behavior for the JM in case of the RM disconnect > after a job has finished? Does it always/sometimes/never shutdown > with/-out communicating the result to the client / updating HA data; > or simply put, does the JM behave to the user as if nothing has happened > in all cases? > > > Once the JM has obtained the required slots to run a job, it no longer > needs to communicate with the RM. Hence, a lost RM connection won't > interfere with the job. I would like to keep it like this by letting the TE > announce global result partitions to the RM and not to introduce another > communication roundtrip. > > > A heartbeat-based approach is useful and can alleviate some failure > cases (see below); but we need to make sure we don't exceed the akka > framesize or otherwise interfere with the heartbeat mechanism (like we > did with metrics in the past). Ideally we would only submit updates to > the partition set (added/removed partitions), but I'm not sure if the > heartbeats are reliable enough for this to work. > > > How big do you expect the payload to become? > > > 8. Failure cases: > /Becket:/ > /a) The TEs may remove the result partition while the RM does not// > //know. In this case, the client will receive a runtime error and submit > the// > //full DAG to recompute the missing result partition. In this case, RM > should// > //release the incomplete global partition. How would RM be notified to do// > //that?// > //b) Is it possible the RM looses global partition metadata while// > //the TE still host the data? For example, RM deletes the global > partition// > //entry while the release partition call to TE failed.// > //c) What would happen if the JM fails before the global partitions// > //are registered to RM? Are users exposed to resource leak if JM does not// > //have HA?// > //d) What would happen if the RM fails? Will TE release the// > //partitions by themselves?/ > > 1.a) This is a good question that I haven't considered. This will likely > require a heartbeat-like report of available partitions. > > > The hearbeat based synchronization approach seems to crystalize as the way > to go forward with this FLIP. > > > > 1.b) RM should only delete entries if it received an ack from the TE; > otherwise we could easily end up leaking partitions. I believe I forgot > writing this down. > 1.c) As described in the FLIP the handoff to the RM must occur before > partitions are promoted. > If the JM fails during the handoff then the TE will cleanup all > partitions since it lost the connection to the JM, and partitions > weren't promoted yet. > If the JM fails after the handoff but before the promotion, same as > above. The RM would contain invalid entries in this case; see 1.a) . > If the JM fails after the handoff and promotion partitions we don't > leak anything since the RM is now fully responsible. > 1.d) yes; if the connection to the RM is disrupted the TE will cleanup > all global partitions, similar to how it cleans up all partitions > associated with a given job if the connection to the corresponding JM is > disrupted. > > 9. /Becket: It looks that TE should be the source of truth of the result > partition// > //existence. Does it have to distinguish between global and local result// > //partitions? If TE does not need to distinguish them, it seems the the// > //releasePartition() method in TE could just provide the list of > partitions// > //to release, without the partitions to promote./ > > The promotion is a hard requirement, as this is the signal to the TE > that this partition is no longer bound to the life-cycle of a job. > Without the promotion the TE would delete the partition once the JM has > shutdown; this is a safety net to ensure cleanup of partitions in case > of a disconnect. > > 10. /In the current design, RM should be able to release result// > //partitions using ShuffleService. Will RM do this by sending RPC to the > TEs?// > //Or will the RM do it by itself?/ > > The RM will send a release call to each TM and issue a release call to > the ShuffleMaster, just like the JobMaster handles partition releases. > > 11. /Becket: How do we plan to handle the case when there are different > shuffle// > //services in the same Flink cluster? For example, a shared standalone// > //cluster./ > > This case is not considered; there are so many changes necessary in > other parts of the runtime that we would jump the gun in addressing it > here. > Ultimately though, I would think that that the addition of a shuffle > master instance ID and shuffle service identifier should suffice. > > The identifier is used in subsequent jobs to load the appropriate > shuffle service for a given partition (think of it like a class name), > while the shuffle master instance ID is used to differentiate between > the different shuffle master instances running in the cluster (which > partitions have to be associated with so we can issue the correct > release calls). > > 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is > there a// > //reason we use `:` instead?/ > > That's netty syntax for path parameters. > > On 30/09/2019 08:34, Becket Qin wrote: > > Forgot to say that I agree with Till that it seems a good idea to let TEs > register the global partitions to the RM instead of letting JM do it. > > This > > simplifies quite a few things. > > Thanks, > > Jiangjie (Becket) Qin > > On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <becket....@gmail.com> > <becket....@gmail.com> > > wrote: > > Hi Chesnay, > > Thanks for the proposal. My understanding of the entire workflow step by > step is following: > > - JM maintains the local and global partition metadata when the task > runs to create result partitions. The tasks themselves does not > > distinguish > > between local / global partitions. Only the JM knows that. > - JM releases the local partitions as the job executes. When a job > finishes successfully, JM registers the global partitions to the RM. The > global partition IDs are set on the client instead of randomly > > generated, > > so the client can release global partitions using them. (It would be > > good > > to have some human readable string associated with the global result > partitions). > - Client issues REST call to list / release global partitions. > > A few thoughts / questions below: > 1. Failure cases: > * The TEs may remove the result partition while the RM does > > not > > know. In this case, the client will receive a runtime error and submit > > the > > full DAG to recompute the missing result partition. In this case, RM > > should > > release the incomplete global partition. How would RM be notified to do > that? > * Is it possible the RM looses global partition metadata > > while > > the TE still host the data? For example, RM deletes the global partition > entry while the release partition call to TE failed. > * What would happen if the JM fails before the global > > partitions > > are registered to RM? Are users exposed to resource leak if JM does not > have HA? > * What would happen if the RM fails? Will TE release the > partitions by themselves? > > 2. It looks that TE should be the source of truth of the result > > partition > > existence. Does it have to distinguish between global and local result > partitions? If TE does not need to distinguish them, it seems the the > releasePartition() method in TE could just provide the list of > > partitions > > to release, without the partitions to promote. > > 3. In the current design, RM should be able to release result > partitions using ShuffleService. Will RM do this by sending RPC to the > > TEs? > > Or will the RM do it by itself? > > 4. How do we plan to handle the case when there are different shuffle > services in the same Flink cluster? For example, a shared standalone > cluster. > > 5. Minor: usually REST API uses `?` to pass the parameters. Is there a > reason we use `:` instead? > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Sep 17, 2019 at 3:22 AM zhijiang<wangzhijiang...@aliyun.com.invalid> > <wangzhijiang...@aliyun.com.invalid> wrote: > > > Thanks Chesnay for this FLIP and sorry for touching it a bit delay on > > my > > side. > > I also have some similar concerns which Till already proposed before. > > 1. The consistent terminology in different components. On JM side, > PartitionTracker#getPersistedBlockingPartitions is defined for getting > global partitions. And on RM side, we define the method of > #registerGlobalPartitions correspondingly for handover the partitions > > from > > JM. I think it is better to unify the term in different components for > > for > > better understanding the semantic. Concering whether to use global or > persistent, I prefer the "global" term personally. Because it > > describes the > > scope of partition clearly, and the "persistent" is more like the > > partition > > storing way or implementation detail. In other words, the global > > partition > > might also be cached in memory of TE, not must persist into files from > semantic requirements. Whether memory or persistent file is just the > implementation choice. > > 2. On TE side, we might rename the method #releasePartitions to > #releaseOrPromotePartitions which describes the function precisely and > keeps consistent with > PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor(). > > 3. Very agree with Till's suggestions of global PartitionTable on TE > > side > > and sticking to TE's heartbeat report to RM for global partitions. > > 4. Considering ShuffleMaster, it was built inside JM and expected to > interactive with JM before. Now the RM also needs to interactive with > ShuffleMaster to release global partitions. Then it might be better to > > move > > ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should > > be > > consistent with RM. > > 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global > partitions for successful jobs" > > Best, > Zhijiang > > > ------------------------------------------------------------------ > From:Till Rohrmann <trohrm...@apache.org> <trohrm...@apache.org> > Send Time:2019年9月10日(星期二) 10:10 > To:dev <dev@flink.apache.org> <dev@flink.apache.org> > Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle > > Thanks Chesnay for drafting the FLIP and starting this discussion. > > I have a couple of comments: > > * I know that I've also coined the terms global/local result partition > > but > > maybe it is not the perfect name. Maybe we could rethink the > > terminology > > and call them persistent result partitions? > * Nit: I would call the last parameter of void releasePartitions(JobID > jobId, Collection<ResultPartitionID> partitionsToRelease, > Collection<ResultPartitionID> partitionsToPromote) either > partitionsToRetain or partitionsToPersistent. > * I'm not sure whether partitionsToRelease should contain a > global/persistent result partition id. I always thought that the user > > will > > be responsible for managing the lifecycle of a global/persistent > result partition. > * Instead of extending the PartitionTable to be able to store > global/persistent and local/transient result partitions, I would rather > introduce a global PartitionTable to store the global/persistent result > partitions explicitly. I think there is a benefit in making things as > explicit as possible. > * The handover logic between the JM and the RM for the > > global/persistent > > result partitions seems a bit brittle to me. What will happen if the JM > cannot reach the RM? I think it would be better if the TM announces the > global/persistent result partitions to the RM via its heartbeats. That > > way > > we don't rely on an established connection between the JM and RM and we > keep the TM as the ground of truth. Moreover, the RM should simply > > forward > > the release calls to the TM without much internal logic. > > Cheers, > Till > > On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <ches...@apache.org> > <ches...@apache.org> > wrote: > > > Hello, > > FLIP-36 (interactive programming) > < > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink > > proposes a new programming paradigm where jobs are built incrementally > by the user. > > To support this in an efficient manner I propose to extend partition > life-cycle to support the notion of /global partitions/, which are > partitions that can exist beyond the life-time of a job. > > These partitions could then be re-used by subsequent jobs in a fairly > efficient manner, as they don't have to persisted to an external > > storage > > first and consuming tasks could be scheduled to exploit data-locality. > > The FLIP outlines the required changes on the JobMaster, TaskExecutor > and ResourceManager to support this from a life-cycle perspective. > > This FLIP does /not/ concern itself with the /usage/ of global > partitions, including client-side APIs, job-submission, scheduling and > reading said partitions; these are all follow-ups that will either be > part of FLIP-36 or spliced out into separate FLIPs. > > > > >