On Fri, Oct 4, 2019 at 12:37 PM Chesnay Schepler <ches...@apache.org> wrote:

> *Till: In the FLIP you wrote "The set of partitions to release may contain 
> local
> and/or global partitions; the promotion set must only refer to local
> partitions." to describe the `releasePartitions`. I think the JM should
> never be in the situation to release a global partition. Moreover, I
> believe we should have a separate RPC to release global result partitions
> which might come from the RM.*
>
> We can certainly add a separate RPC method for explicitly releasing global 
> partitions.
> You are correct that the JM should not be able to release those, just like 
> the RM should not be able to release non-global partitions.
> *Till: Once the JM has obtained the required slots to run a job, it no longer
> needs to communicate with the RM. Hence, a lost RM connection won't
> interfere with the job. I would like to keep it like this by letting the TE
> announce global result partitions to the RM and not to introduce another
> communication roundtrip.
>
> *Agreed, this is a nice property to retain.
> *Till: How big do you expect the payload to become?
>
> *I don't know, which is precisely why I want to be cautious about it.
> The last time I made a similar assumption I didn't expect anyone to have 
> hundreds of thousands of metrics on a single TM, which turned out to be wrong.
> I wouldn't exclude the possibility of a similar number of partitions being 
> hosted on a single TE.
>
>
> One problem we have to solve with the heartbeat-based approach is that 
> partitions may be lost without the TE noticing, due to disk-failures or 
> external delete operations.
> Currently, for scheduling purposes we rely on information stored in the JM, 
> and update said information if a job fails due to a missing partition. 
> However, IIRC the JM is informed about with an exception that is thrown by 
> the consumer of said partition, not the producer. As far as the producing TM 
> is concerned, it is still hosting that partition.
> This means we have to forward errors for missing partitions from the network 
> stack on the producers side to the TE, so that it can inform the RM about it.
>
>
Yes, I think you are right Chesnay. This would also be a good addition for
the local result partitions.

Cheers,
Till

>  On 02/10/2019 16:21, Till Rohrmann wrote:
>
> Thanks for addressing our comments Chesnay. See some comments inline.
>
> On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <ches...@apache.org> 
> <ches...@apache.org> wrote:
>
>
> Thank you for your comments; I've aggregated them a bit and added
> comments to each of them.
>
> 1) Concept name (proposal: persistent)
>
> I agree that "global" is rather undescriptive, particularly so since we
> never had a notion of "local" partitions.
> I'm not a fan of "persistent"; as to me this always implies reliable
> long-term storage which as I understand we aren't shooting for here.
>
> I was thinking of "cached" partitions.
>
> To Zhijiangs point, we should of course make the naming consistent
> everywhere.
>
> 2) Naming of last parameter of TE#releasePartitions (proposal:
> partitionsToRetain / partitionsToPersistent)
>
> I can see where you're coming from ("promote" is somewhat abstract), but
> I think both suggestions have downsides.
>
> "partitionsToPersistent" to me implies an additional write operation to
> somewhere, but we aren't doing that.
> "partitionsToRetain" kind of results in a redundancy with the other
> argument since retaining is the opposite to releasing a partition; if I
> want to retain a partition, why am I not just excluding it from the set
> to release?
>
> I quite like "promote" personally; we fundamentally change how the
> lifecycle for these partitions work, and introducing new keywords isn't
> a inherently a bad choice.
>
> 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions;
> Note: addition of "OrPromote" is dependent on 2) )
>
> Good point.
>
> 4) /Till: I'm not sure whether partitionsToRelease should contain a//
> //global/persistent result partition id. I always thought that the user
> will//
> //be responsible for managing the lifecycle of a global/persistent//
> //result partition./
>
> @Till Please elaborate; which method/argument are you referring to?
>
>
> In the FLIP you wrote "The set of partitions to release may contain local
> and/or global partitions; the promotion set must only refer to local
> partitions." to describe the `releasePartitions`. I think the JM should
> never be in the situation to release a global partition. Moreover, I
> believe we should have a separate RPC to release global result partitions
> which might come from the RM.
>
>
> 4)/Dedicated PartitionTable for global partitions/
>
> Since there is only one RM for each TE a PartitionTable is unnecessary;
> a simple set will suffice.
> Alternatively, we could introduce such a dedicated set into the
> PartitionTable to keep these data-structures close.
>
> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs
> retain global partitions for successful jobs"/
>
> Will fix it.
>
> 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and
> expected to interactive with JM before. Now the RM also needs to
> interactive with ShuffleMaster to release global partitions. Then it
> might be better to move ShuffleMaster outside of JM, and the lifecycle
> of ShuffleMaster should be consistent with RM./
>
> Yes, I alluded to this in the FLIP but should've been more explicit; the
> shuffle master must outlive the JM. This is somewhat tricky when
> considering the future a bit; if we assume that different jobs or even a
> single one can use different shuffle services, then we need a way to
> associate the partitions with the corresponding shuffle master. This
> will likely require the introduction of a ShuffleMasterID that is
> included in the ShuffleDescriptor.
>
> 7) Handover
>
> /Till: The handover logic between the JM and the RM for the
> global/persistent//
> //result partitions seems a bit brittle to me. What will happen if the JM//
> //cannot reach the RM? I think it would be better if the TM announces the//
> //global/persistent result partitions to the RM via its heartbeats. That
> way//
> //we don't rely on an established connection between the JM and RM and we//
> //keep the TM as the ground of truth. Moreover, the RM should simply
> forward//
> //the release calls to the TM without much internal logic./
>
> As for your question, if the JM cannot reach the RM the handover will
> fail, the JM will likely shutdown without promoting any partition and
> the TE will release all partitions.
> What is the defined behavior for the JM in case of the RM disconnect
> after a job has finished? Does it always/sometimes/never shutdown
> with/-out communicating the result to the client / updating HA data;
> or simply put, does the JM behave to the user as if nothing has happened
> in all cases?
>
>
> Once the JM has obtained the required slots to run a job, it no longer
> needs to communicate with the RM. Hence, a lost RM connection won't
> interfere with the job. I would like to keep it like this by letting the TE
> announce global result partitions to the RM and not to introduce another
> communication roundtrip.
>
>
> A heartbeat-based approach is useful and can alleviate some failure
> cases (see below); but we need to make sure we don't exceed the akka
> framesize or otherwise interfere with the heartbeat mechanism (like we
> did with metrics in the past). Ideally we would only submit updates to
> the partition set (added/removed partitions), but I'm not sure if the
> heartbeats are reliable enough for this to work.
>
>
> How big do you expect the payload to become?
>
>
> 8. Failure cases:
> /Becket:/
> /a) The TEs may remove the result partition while the RM does not//
> //know. In this case, the client will receive a runtime error and submit
> the//
> //full DAG to recompute the missing result partition. In this case, RM
> should//
> //release the incomplete global partition. How would RM be notified to do//
> //that?//
> //b) Is it possible the RM looses global partition metadata while//
> //the TE still host the data? For example, RM deletes the global
> partition//
> //entry while the release partition call to TE failed.//
> //c) What would happen if the JM fails before the global partitions//
> //are registered to RM? Are users exposed to resource leak if JM does not//
> //have HA?//
> //d) What would happen if the RM fails? Will TE release the//
> //partitions by themselves?/
>
> 1.a) This is a good question that I haven't considered. This will likely
> require a heartbeat-like report of available partitions.
>
>
> The hearbeat based synchronization approach seems to crystalize as the way
> to go forward with this FLIP.
>
>
>
> 1.b) RM should only delete entries if it received an ack from the TE;
> otherwise we could easily end up leaking partitions. I believe I forgot
> writing this down.
> 1.c) As described in the FLIP the handoff to the RM must occur before
> partitions are promoted.
>      If the JM fails during the handoff then the TE will cleanup all
> partitions since it lost the connection to the JM, and partitions
> weren't promoted yet.
>      If the JM fails after the handoff but before the promotion, same as
> above. The RM would contain invalid entries in this case; see 1.a) .
>      If the JM fails after the handoff and promotion partitions we don't
> leak anything since the RM is now fully responsible.
> 1.d) yes; if the connection to the RM is disrupted the TE will cleanup
> all global partitions, similar to how it cleans up all partitions
> associated with a given job if the connection to the corresponding JM is
> disrupted.
>
> 9. /Becket: It looks that TE should be the source of truth of the result
> partition//
> //existence. Does it have to distinguish between global and local result//
> //partitions? If TE does not need to distinguish them, it seems the the//
> //releasePartition() method in TE could just provide the list of
> partitions//
> //to release, without the partitions to promote./
>
> The promotion is a hard requirement, as this is the signal to the TE
> that this partition is no longer bound to the life-cycle of a job.
> Without the promotion the TE would delete the partition once the JM has
> shutdown; this is a safety net to ensure cleanup of partitions in case
> of a disconnect.
>
> 10. /In the current design, RM should be able to release result//
> //partitions using ShuffleService. Will RM do this by sending RPC to the
> TEs?//
> //Or will the RM do it by itself?/
>
> The RM will send a release call to each TM and issue a release call to
> the ShuffleMaster, just like the JobMaster handles partition releases.
>
> 11. /Becket: How do we plan to handle the case when there are different
> shuffle//
> //services in the same Flink cluster? For example, a shared standalone//
> //cluster./
>
> This case is not considered; there are so many changes necessary in
> other parts of the runtime that we would jump the gun in addressing it
> here.
> Ultimately though, I would think that that the addition of a shuffle
> master instance ID and shuffle service identifier should suffice.
>
> The identifier is used in subsequent jobs to load the appropriate
> shuffle service for a given partition (think of it like a class name),
> while the shuffle master instance ID is used to differentiate between
> the different shuffle master instances running in the cluster (which
> partitions have to be associated with so we can issue the correct
> release calls).
>
> 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is
> there a//
> //reason we use `:` instead?/
>
> That's netty syntax for path parameters.
>
> On 30/09/2019 08:34, Becket Qin wrote:
>
> Forgot to say that I agree with Till that it seems a good idea to let TEs
> register the global partitions to the RM instead of letting JM do it.
>
> This
>
> simplifies quite a few things.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <becket....@gmail.com> 
> <becket....@gmail.com>
>
> wrote:
>
> Hi Chesnay,
>
> Thanks for the proposal. My understanding of the entire workflow step by
> step is following:
>
>     - JM maintains the local and global partition metadata when the task
> runs to create result partitions. The tasks themselves does not
>
> distinguish
>
> between local / global partitions. Only the JM knows that.
>     - JM releases the local partitions as the job executes. When a job
> finishes successfully, JM registers the global partitions to the RM. The
> global partition IDs are set on the client instead of randomly
>
> generated,
>
> so the client can release global partitions using them. (It would be
>
> good
>
> to have some human readable string associated with the global result
> partitions).
>     - Client issues REST call to list / release global partitions.
>
> A few thoughts / questions below:
> 1. Failure cases:
>            * The TEs may remove the result partition while the RM does
>
> not
>
> know. In this case, the client will receive a runtime error and submit
>
> the
>
> full DAG to recompute the missing result partition. In this case, RM
>
> should
>
> release the incomplete global partition. How would RM be notified to do
> that?
>            * Is it possible the RM looses global partition metadata
>
> while
>
> the TE still host the data? For example, RM deletes the global partition
> entry while the release partition call to TE failed.
>            * What would happen if the JM fails before the global
>
> partitions
>
> are registered to RM? Are users exposed to resource leak if JM does not
> have HA?
>            * What would happen if the RM fails? Will TE release the
> partitions by themselves?
>
> 2. It looks that TE should be the source of truth of the result
>
> partition
>
> existence. Does it have to distinguish between global and local result
> partitions? If TE does not need to distinguish them, it seems the the
> releasePartition() method in TE could just provide the list of
>
> partitions
>
> to release, without the partitions to promote.
>
> 3. In the current design, RM should be able to release result
> partitions using ShuffleService. Will RM do this by sending RPC to the
>
> TEs?
>
> Or will the RM do it by itself?
>
> 4. How do we plan to handle the case when there are different shuffle
> services in the same Flink cluster? For example, a shared standalone
> cluster.
>
> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a
> reason we use `:` instead?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Sep 17, 2019 at 3:22 AM zhijiang<wangzhijiang...@aliyun.com.invalid> 
> <wangzhijiang...@aliyun.com.invalid> wrote:
>
>
> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on
>
> my
>
> side.
>
> I also have some similar concerns which Till already proposed before.
>
> 1. The consistent terminology in different components. On JM side,
> PartitionTracker#getPersistedBlockingPartitions is defined for getting
> global partitions. And on RM side, we define the method of
> #registerGlobalPartitions correspondingly for handover the partitions
>
> from
>
> JM. I think it is better to unify the term in different components for
>
> for
>
> better understanding the semantic. Concering whether to use global or
> persistent, I prefer the "global" term personally. Because it
>
> describes the
>
> scope of partition clearly, and the "persistent" is more like the
>
> partition
>
> storing way or implementation detail. In other words, the global
>
> partition
>
> might also be cached in memory of TE, not must persist into files from
> semantic requirements. Whether memory or persistent file is just the
> implementation choice.
>
> 2. On TE side, we might rename the method #releasePartitions to
> #releaseOrPromotePartitions which describes the function precisely and
> keeps consistent with
> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor().
>
> 3. Very agree with Till's suggestions of global PartitionTable on TE
>
> side
>
> and sticking to TE's heartbeat report to RM for global partitions.
>
> 4. Considering ShuffleMaster, it was built inside JM and expected to
> interactive with JM before. Now the RM also needs to interactive with
> ShuffleMaster to release global partitions. Then it might be better to
>
> move
>
> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should
>
> be
>
> consistent with RM.
>
> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global
> partitions for successful jobs"
>
> Best,
> Zhijiang
>
>
> ------------------------------------------------------------------
> From:Till Rohrmann <trohrm...@apache.org> <trohrm...@apache.org>
> Send Time:2019年9月10日(星期二) 10:10
> To:dev <dev@flink.apache.org> <dev@flink.apache.org>
> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>
> Thanks Chesnay for drafting the FLIP and starting this discussion.
>
> I have a couple of comments:
>
> * I know that I've also coined the terms global/local result partition
>
> but
>
> maybe it is not the perfect name. Maybe we could rethink the
>
> terminology
>
> and call them persistent result partitions?
> * Nit: I would call the last parameter of void releasePartitions(JobID
> jobId, Collection<ResultPartitionID> partitionsToRelease,
> Collection<ResultPartitionID> partitionsToPromote) either
> partitionsToRetain or partitionsToPersistent.
> * I'm not sure whether partitionsToRelease should contain a
> global/persistent result partition id. I always thought that the user
>
> will
>
> be responsible for managing the lifecycle of a global/persistent
> result partition.
> * Instead of extending the PartitionTable to be able to store
> global/persistent and local/transient result partitions, I would rather
> introduce a global PartitionTable to store the global/persistent result
> partitions explicitly. I think there is a benefit in making things as
> explicit as possible.
> * The handover logic between the JM and the RM for the
>
> global/persistent
>
> result partitions seems a bit brittle to me. What will happen if the JM
> cannot reach the RM? I think it would be better if the TM announces the
> global/persistent result partitions to the RM via its heartbeats. That
>
> way
>
> we don't rely on an established connection between the JM and RM and we
> keep the TM as the ground of truth. Moreover, the RM should simply
>
> forward
>
> the release calls to the TM without much internal logic.
>
> Cheers,
> Till
>
> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <ches...@apache.org> 
> <ches...@apache.org>
> wrote:
>
>
> Hello,
>
> FLIP-36 (interactive programming)
> <
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
> proposes a new programming paradigm where jobs are built incrementally
> by the user.
>
> To support this in an efficient manner I propose to extend partition
> life-cycle to support the notion of /global partitions/, which are
> partitions that can exist beyond the life-time of a job.
>
> These partitions could then be re-used by subsequent jobs in a fairly
> efficient manner, as they don't have to persisted to an external
>
> storage
>
> first and consuming tasks could be scheduled to exploit data-locality.
>
> The FLIP outlines the required changes on the JobMaster, TaskExecutor
> and ResourceManager to support this from a life-cycle perspective.
>
> This FLIP does /not/ concern itself with the /usage/ of global
> partitions, including client-side APIs, job-submission, scheduling and
> reading said partitions; these are all follow-ups that will either be
> part of FLIP-36 or spliced out into separate FLIPs.
>
>
>
>
>

Reply via email to