Forgot to say that I agree with Till that it seems a good idea to let TEs
register the global partitions to the RM instead of letting JM do it. This
simplifies quite a few things.

Thanks,

Jiangjie (Becket) Qin

On Sun, Sep 29, 2019 at 11:25 PM Becket Qin <becket....@gmail.com> wrote:

> Hi Chesnay,
>
> Thanks for the proposal. My understanding of the entire workflow step by
> step is following:
>
>    - JM maintains the local and global partition metadata when the task
> runs to create result partitions. The tasks themselves does not distinguish
> between local / global partitions. Only the JM knows that.
>    - JM releases the local partitions as the job executes. When a job
> finishes successfully, JM registers the global partitions to the RM. The
> global partition IDs are set on the client instead of randomly generated,
> so the client can release global partitions using them. (It would be good
> to have some human readable string associated with the global result
> partitions).
>    - Client issues REST call to list / release global partitions.
>
> A few thoughts / questions below:
> 1. Failure cases:
>           * The TEs may remove the result partition while the RM does not
> know. In this case, the client will receive a runtime error and submit the
> full DAG to recompute the missing result partition. In this case, RM should
> release the incomplete global partition. How would RM be notified to do
> that?
>           * Is it possible the RM looses global partition metadata while
> the TE still host the data? For example, RM deletes the global partition
> entry while the release partition call to TE failed.
>           * What would happen if the JM fails before the global partitions
> are registered to RM? Are users exposed to resource leak if JM does not
> have HA?
>           * What would happen if the RM fails? Will TE release the
> partitions by themselves?
>
> 2. It looks that TE should be the source of truth of the result partition
> existence. Does it have to distinguish between global and local result
> partitions? If TE does not need to distinguish them, it seems the the
> releasePartition() method in TE could just provide the list of partitions
> to release, without the partitions to promote.
>
> 3. In the current design, RM should be able to release result
> partitions using ShuffleService. Will RM do this by sending RPC to the TEs?
> Or will the RM do it by itself?
>
> 4. How do we plan to handle the case when there are different shuffle
> services in the same Flink cluster? For example, a shared standalone
> cluster.
>
> 5. Minor: usually REST API uses `?` to pass the parameters. Is there a
> reason we use `:` instead?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Sep 17, 2019 at 3:22 AM zhijiang
> <wangzhijiang...@aliyun.com.invalid> wrote:
>
>>
>> Thanks Chesnay for this FLIP and sorry for touching it a bit delay on my
>> side.
>>
>> I also have some similar concerns which Till already proposed before.
>>
>> 1. The consistent terminology in different components. On JM side,
>> PartitionTracker#getPersistedBlockingPartitions is defined for getting
>> global partitions. And on RM side, we define the method of
>> #registerGlobalPartitions correspondingly for handover the partitions from
>> JM. I think it is better to unify the term in different components for for
>> better understanding the semantic. Concering whether to use global or
>> persistent, I prefer the "global" term personally. Because it describes the
>> scope of partition clearly, and the "persistent" is more like the partition
>> storing way or implementation detail. In other words, the global partition
>> might also be cached in memory of TE, not must persist into files from
>> semantic requirements. Whether memory or persistent file is just the
>> implementation choice.
>>
>> 2. On TE side, we might rename the method #releasePartitions to
>> #releaseOrPromotePartitions which describes the function precisely and
>> keeps consistent with
>> PartitionTracker#stopTrackingAndReleaseOrPromotePartitionsFor().
>>
>> 3. Very agree with Till's suggestions of global PartitionTable on TE side
>> and sticking to TE's heartbeat report to RM for global partitions.
>>
>> 4. Considering ShuffleMaster, it was built inside JM and expected to
>> interactive with JM before. Now the RM also needs to interactive with
>> ShuffleMaster to release global partitions. Then it might be better to move
>> ShuffleMaster outside of JM, and the lifecycle of ShuffleMaster should be
>> consistent with RM.
>>
>> 5. Nit: TM->TE in the section of Proposed Changes: "TMs retain global
>> partitions for successful jobs"
>>
>> Best,
>> Zhijiang
>>
>>
>> ------------------------------------------------------------------
>> From:Till Rohrmann <trohrm...@apache.org>
>> Send Time:2019年9月10日(星期二) 10:10
>> To:dev <dev@flink.apache.org>
>> Subject:Re: [DISCUSS] FLIP-67: Global partitions lifecycle
>>
>> Thanks Chesnay for drafting the FLIP and starting this discussion.
>>
>> I have a couple of comments:
>>
>> * I know that I've also coined the terms global/local result partition but
>> maybe it is not the perfect name. Maybe we could rethink the terminology
>> and call them persistent result partitions?
>> * Nit: I would call the last parameter of void releasePartitions(JobID
>> jobId, Collection<ResultPartitionID> partitionsToRelease,
>> Collection<ResultPartitionID> partitionsToPromote) either
>> partitionsToRetain or partitionsToPersistent.
>> * I'm not sure whether partitionsToRelease should contain a
>> global/persistent result partition id. I always thought that the user will
>> be responsible for managing the lifecycle of a global/persistent
>> result partition.
>> * Instead of extending the PartitionTable to be able to store
>> global/persistent and local/transient result partitions, I would rather
>> introduce a global PartitionTable to store the global/persistent result
>> partitions explicitly. I think there is a benefit in making things as
>> explicit as possible.
>> * The handover logic between the JM and the RM for the global/persistent
>> result partitions seems a bit brittle to me. What will happen if the JM
>> cannot reach the RM? I think it would be better if the TM announces the
>> global/persistent result partitions to the RM via its heartbeats. That way
>> we don't rely on an established connection between the JM and RM and we
>> keep the TM as the ground of truth. Moreover, the RM should simply forward
>> the release calls to the TM without much internal logic.
>>
>> Cheers,
>> Till
>>
>> On Fri, Sep 6, 2019 at 3:16 PM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>> > Hello,
>> >
>> > FLIP-36 (interactive programming)
>> > <
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>> >
>> >
>> > proposes a new programming paradigm where jobs are built incrementally
>> > by the user.
>> >
>> > To support this in an efficient manner I propose to extend partition
>> > life-cycle to support the notion of /global partitions/, which are
>> > partitions that can exist beyond the life-time of a job.
>> >
>> > These partitions could then be re-used by subsequent jobs in a fairly
>> > efficient manner, as they don't have to persisted to an external storage
>> > first and consuming tasks could be scheduled to exploit data-locality.
>> >
>> > The FLIP outlines the required changes on the JobMaster, TaskExecutor
>> > and ResourceManager to support this from a life-cycle perspective.
>> >
>> > This FLIP does /not/ concern itself with the /usage/ of global
>> > partitions, including client-side APIs, job-submission, scheduling and
>> > reading said partitions; these are all follow-ups that will either be
>> > part of FLIP-36 or spliced out into separate FLIPs.
>> >
>> >
>>
>>

Reply via email to