Thanks for addressing our comments Chesnay. See some comments inline.

On Wed, Oct 2, 2019 at 4:07 PM Chesnay Schepler <ches...@apache.org> wrote:

> Thank you for your comments; I've aggregated them a bit and added
> comments to each of them.
>
> 1) Concept name (proposal: persistent)
>
> I agree that "global" is rather undescriptive, particularly so since we
> never had a notion of "local" partitions.
> I'm not a fan of "persistent"; as to me this always implies reliable
> long-term storage which as I understand we aren't shooting for here.
>
> I was thinking of "cached" partitions.
>
> To Zhijiangs point, we should of course make the naming consistent
> everywhere.
>
> 2) Naming of last parameter of TE#releasePartitions (proposal:
> partitionsToRetain / partitionsToPersistent)
>
> I can see where you're coming from ("promote" is somewhat abstract), but
> I think both suggestions have downsides.
>
> "partitionsToPersistent" to me implies an additional write operation to
> somewhere, but we aren't doing that.
> "partitionsToRetain" kind of results in a redundancy with the other
> argument since retaining is the opposite to releasing a partition; if I
> want to retain a partition, why am I not just excluding it from the set
> to release?
>
> I quite like "promote" personally; we fundamentally change how the
> lifecycle for these partitions work, and introducing new keywords isn't
> a inherently a bad choice.
>
> 3) Naming of TE#releasePartitions (proposal: releaseOrPromotePartitions;
> Note: addition of "OrPromote" is dependent on 2) )
>
> Good point.
>
> 4) /Till: I'm not sure whether partitionsToRelease should contain a//
> //global/persistent result partition id. I always thought that the user
> will//
> //be responsible for managing the lifecycle of a global/persistent//
> //result partition./
>
> @Till Please elaborate; which method/argument are you referring to?
>

In the FLIP you wrote "The set of partitions to release may contain local
and/or global partitions; the promotion set must only refer to local
partitions." to describe the `releasePartitions`. I think the JM should
never be in the situation to release a global partition. Moreover, I
believe we should have a separate RPC to release global result partitions
which might come from the RM.

>
> 4)/Dedicated PartitionTable for global partitions/
>
> Since there is only one RM for each TE a PartitionTable is unnecessary;
> a simple set will suffice.
> Alternatively, we could introduce such a dedicated set into the
> PartitionTable to keep these data-structures close.
>
> 5) /Zhijiang: Nit: TM->TE in the section of Proposed Changes: "TMs
> retain global partitions for successful jobs"/
>
> Will fix it.
>
> 6) /Zhijiang: Considering ShuffleMaster, it was built inside JM and
> expected to interactive with JM before. Now the RM also needs to
> interactive with ShuffleMaster to release global partitions. Then it
> might be better to move ShuffleMaster outside of JM, and the lifecycle
> of ShuffleMaster should be consistent with RM./
>
> Yes, I alluded to this in the FLIP but should've been more explicit; the
> shuffle master must outlive the JM. This is somewhat tricky when
> considering the future a bit; if we assume that different jobs or even a
> single one can use different shuffle services, then we need a way to
> associate the partitions with the corresponding shuffle master. This
> will likely require the introduction of a ShuffleMasterID that is
> included in the ShuffleDescriptor.
>
> 7) Handover
>
> /Till: The handover logic between the JM and the RM for the
> global/persistent//
> //result partitions seems a bit brittle to me. What will happen if the JM//
> //cannot reach the RM? I think it would be better if the TM announces the//
> //global/persistent result partitions to the RM via its heartbeats. That
> way//
> //we don't rely on an established connection between the JM and RM and we//
> //keep the TM as the ground of truth. Moreover, the RM should simply
> forward//
> //the release calls to the TM without much internal logic./
>
> As for your question, if the JM cannot reach the RM the handover will
> fail, the JM will likely shutdown without promoting any partition and
> the TE will release all partitions.
> What is the defined behavior for the JM in case of the RM disconnect
> after a job has finished? Does it always/sometimes/never shutdown
> with/-out communicating the result to the client / updating HA data;
> or simply put, does the JM behave to the user as if nothing has happened
> in all cases?
>

Once the JM has obtained the required slots to run a job, it no longer
needs to communicate with the RM. Hence, a lost RM connection won't
interfere with the job. I would like to keep it like this by letting the TE
announce global result partitions to the RM and not to introduce another
communication roundtrip.

>
> A heartbeat-based approach is useful and can alleviate some failure
> cases (see below); but we need to make sure we don't exceed the akka
> framesize or otherwise interfere with the heartbeat mechanism (like we
> did with metrics in the past). Ideally we would only submit updates to
> the partition set (added/removed partitions), but I'm not sure if the
> heartbeats are reliable enough for this to work.
>

How big do you expect the payload to become?

>
> 8. Failure cases:
> /Becket:/
> /a) The TEs may remove the result partition while the RM does not//
> //know. In this case, the client will receive a runtime error and submit
> the//
> //full DAG to recompute the missing result partition. In this case, RM
> should//
> //release the incomplete global partition. How would RM be notified to do//
> //that?//
> //b) Is it possible the RM looses global partition metadata while//
> //the TE still host the data? For example, RM deletes the global
> partition//
> //entry while the release partition call to TE failed.//
> //c) What would happen if the JM fails before the global partitions//
> //are registered to RM? Are users exposed to resource leak if JM does not//
> //have HA?//
> //d) What would happen if the RM fails? Will TE release the//
> //partitions by themselves?/
>
> 1.a) This is a good question that I haven't considered. This will likely
> require a heartbeat-like report of available partitions.
>

The hearbeat based synchronization approach seems to crystalize as the way
to go forward with this FLIP.


> 1.b) RM should only delete entries if it received an ack from the TE;
> otherwise we could easily end up leaking partitions. I believe I forgot
> writing this down.
> 1.c) As described in the FLIP the handoff to the RM must occur before
> partitions are promoted.
>      If the JM fails during the handoff then the TE will cleanup all
> partitions since it lost the connection to the JM, and partitions
> weren't promoted yet.
>      If the JM fails after the handoff but before the promotion, same as
> above. The RM would contain invalid entries in this case; see 1.a) .
>      If the JM fails after the handoff and promotion partitions we don't
> leak anything since the RM is now fully responsible.
> 1.d) yes; if the connection to the RM is disrupted the TE will cleanup
> all global partitions, similar to how it cleans up all partitions
> associated with a given job if the connection to the corresponding JM is
> disrupted.
>
> 9. /Becket: It looks that TE should be the source of truth of the result
> partition//
> //existence. Does it have to distinguish between global and local result//
> //partitions? If TE does not need to distinguish them, it seems the the//
> //releasePartition() method in TE could just provide the list of
> partitions//
> //to release, without the partitions to promote./
>
> The promotion is a hard requirement, as this is the signal to the TE
> that this partition is no longer bound to the life-cycle of a job.
> Without the promotion the TE would delete the partition once the JM has
> shutdown; this is a safety net to ensure cleanup of partitions in case
> of a disconnect.
>
> 10. /In the current design, RM should be able to release result//
> //partitions using ShuffleService. Will RM do this by sending RPC to the
> TEs?//
> //Or will the RM do it by itself?/
>
> The RM will send a release call to each TM and issue a release call to
> the ShuffleMaster, just like the JobMaster handles partition releases.
>
> 11. /Becket: How do we plan to handle the case when there are different
> shuffle//
> //services in the same Flink cluster? For example, a shared standalone//
> //cluster./
>
> This case is not considered; there are so many changes necessary in
> other parts of the runtime that we would jump the gun in addressing it
> here.
> Ultimately though, I would think that that the addition of a shuffle
> master instance ID and shuffle service identifier should suffice.
>
> The identifier is used in subsequent jobs to load the appropriate
> shuffle service for a given partition (think of it like a class name),
> while the shuffle master instance ID is used to differentiate between
> the different shuffle master instances running in the cluster (which
> partitions have to be associated with so we can issue the correct
> release calls).
>
> 12. /Becket: Minor: usually REST API uses `?` to pass the parameters. Is
> there a//
> //reason we use `:` instead?/
>
> That's netty syntax for path parameters.
>
> On 30/09/2019 08:34, Becket Qin wrote:
> > Forgot to say that I agree with Till that it seems a good idea to let TEs
> > register the global partitions to the RM instead of letting JM do it.
> This
> > simplifies quite a few things.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <becket....@gmail.com>
> wrote:
> >
> >> Hi Chesnay,
> >>
> >> Thanks for the proposal. My understanding of the entire workflow step by
> >> step is following:
> >>
> >>     - JM maintains the local and global partition metadata when the task
> >> runs to create result partitions. The tasks themselves does not
> distinguish
> >> between local / global partitions. Only the JM knows that.
> >>     - JM releases the local partitions as the job executes. When a job
> >> finishes successfully, JM registers the global partitions to the RM. The
> >> global partition IDs are set on the client instead of randomly
> generated,
> >> so the client can release global partitions using them. (It would be
> good
> >> to have some human readable string associated with the global result
> >> partitions).
> >>     - Client issues REST call to list / release global partitions.
> >>
> >> A few thoughts / questions below:
> >> 1. Failure cases:
> >>            * The TEs may remove the result partition while the RM does
> not
> >> know. In this case, the client will receive a runtime error and submit
> the
> >> full DAG to recompute the missing result partition. In this case, RM
> should
> >> release the incomplete global partition. How would RM be notified to do
> >> that?
> >>            * Is it possible the RM looses global partition metadata
> while
> >> the TE still host the data? For example, RM deletes the global partition
> >> entry while the release partition call to TE failed.
> >>            * What would happen if the JM fails before the global
> partitions
> >> are registered to RM? Are users exposed to resource leak if JM does not
> >> have HA?
> >>            * What would happen if the RM fails? Will TE release the
> >> partitions by themselves?
> >>
> >> 2. It looks that TE should be the source of truth of the result
> partition
> >> existence. Does it have to distinguish between global and local result
> >> partitions? If TE does not need to distinguish them, it seems the the
> >> releasePartition() method in TE could just provide the list of
> partitions
> >> to release, without the partitions to promote.
> >>
> >> 3. In the current design, RM should be able to release result
> >> partitions using ShuffleService. Will RM do this by sending RPC to the
> TEs?
> >> Or will the RM do it by itself?
> >>
> >> 4. How do we plan to handle the case when there are different shuffle
> >> services in the same Flink cluster? For example, a shared standalone
> >> cluster.
> >>
> >> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a
> >> reason we use `:` instead?
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Tue, Sep 17, 2019 at 3:22 AM zhijiang
> >> <wangzhijiang...@aliyun.com.invalid> wrote:
> >>
> >>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on
> my
> >>> side.
> >>>
> >>> I also have some similar concerns which Till already proposed before.
> >>>
> >>> 1. The consistent terminology in different components. On JM side,
> >>> PartitionTracker#getPersistedBlockingPartitions is defined for getting
> >>> global partitions. And on RM side, we define the method of
> >>> #registerGlobalPartitions correspondingly for handover the partitions
> from
> >>> JM. I think it is better to unify the term in different components for
> for
> >>> better understanding the semantic. Concering whether to use global or
> >>> persistent, I prefer the "global" term personally. Because it
> describes the
> >>> scope of partition clearly, and the "persistent" is more like the
> partition
> >>> storing way or implementation detail. In other words, the global
> partition
> >>> might also be cached in memory of TE, not must persist into files from
> >>> semantic requirements. Whether memory or persistent file is just the
> >>> implementation choice.
> >>>
> >>> 2. On TE side, we might rename the method #releasePartitions to
> >>> #releaseOrPromotePartitions which describes the function precisely and
> >>> keeps consistent with
> >>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor().
> >>>
> >>> 3. Very agree with Till's suggestions of global PartitionTable on TE
> side
> >>> and sticking to TE's heartbeat report to RM for global partitions.
> >>>
> >>> 4. Considering ShuffleMaster, it was built inside JM and expected to
> >>> interactive with JM before. Now the RM also needs to interactive with
> >>> ShuffleMaster to release global partitions. Then it might be better to
> move
> >>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should
> be
> >>> consistent with RM.
> >>>
> >>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global
> >>> partitions for successful jobs"
> >>>
> >>> Best,
> >>> Zhijiang
> >>>
> >>>
> >>> ------------------------------------------------------------------
> >>> From:Till Rohrmann <trohrm...@apache.org>
> >>> Send Time:2019年9月10日(星期二) 10:10
> >>> To:dev <dev@flink.apache.org>
> >>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
> >>>
> >>> Thanks Chesnay for drafting the FLIP and starting this discussion.
> >>>
> >>> I have a couple of comments:
> >>>
> >>> * I know that I've also coined the terms global/local result partition
> but
> >>> maybe it is not the perfect name. Maybe we could rethink the
> terminology
> >>> and call them persistent result partitions?
> >>> * Nit: I would call the last parameter of void releasePartitions(JobID
> >>> jobId, Collection<ResultPartitionID> partitionsToRelease,
> >>> Collection<ResultPartitionID> partitionsToPromote) either
> >>> partitionsToRetain or partitionsToPersistent.
> >>> * I'm not sure whether partitionsToRelease should contain a
> >>> global/persistent result partition id. I always thought that the user
> will
> >>> be responsible for managing the lifecycle of a global/persistent
> >>> result partition.
> >>> * Instead of extending the PartitionTable to be able to store
> >>> global/persistent and local/transient result partitions, I would rather
> >>> introduce a global PartitionTable to store the global/persistent result
> >>> partitions explicitly. I think there is a benefit in making things as
> >>> explicit as possible.
> >>> * The handover logic between the JM and the RM for the
> global/persistent
> >>> result partitions seems a bit brittle to me. What will happen if the JM
> >>> cannot reach the RM? I think it would be better if the TM announces the
> >>> global/persistent result partitions to the RM via its heartbeats. That
> way
> >>> we don't rely on an established connection between the JM and RM and we
> >>> keep the TM as the ground of truth. Moreover, the RM should simply
> forward
> >>> the release calls to the TM without much internal logic.
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <ches...@apache.org>
> >>> wrote:
> >>>
> >>>> Hello,
> >>>>
> >>>> FLIP-36 (interactive programming)
> >>>> <
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >>>>
> >>>> proposes a new programming paradigm where jobs are built incrementally
> >>>> by the user.
> >>>>
> >>>> To support this in an efficient manner I propose to extend partition
> >>>> life-cycle to support the notion of /global partitions/, which are
> >>>> partitions that can exist beyond the life-time of a job.
> >>>>
> >>>> These partitions could then be re-used by subsequent jobs in a fairly
> >>>> efficient manner, as they don't have to persisted to an external
> storage
> >>>> first and consuming tasks could be scheduled to exploit data-locality.
> >>>>
> >>>> The FLIP outlines the required changes on the JobMaster, TaskExecutor
> >>>> and ResourceManager to support this from a life-cycle perspective.
> >>>>
> >>>> This FLIP does /not/ concern itself with the /usage/ of global
> >>>> partitions, including client-side APIs, job-submission, scheduling and
> >>>> reading said partitions; these are all follow-ups that will either be
> >>>> part of FLIP-36 or spliced out into separate FLIPs.
> >>>>
> >>>>
> >>>
>
>

Reply via email to