[ https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15430700#comment-15430700 ]
ASF GitHub Bot commented on FLINK-4348: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2389#discussion_r75671552 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java --- @@ -18,8 +18,78 @@ package org.apache.flink.runtime.rpc.resourcemanager; +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; + import java.io.Serializable; -public class SlotRequest implements Serializable{ +/** + * Slot allocation request from jobManager to resourceManager + */ +public class SlotRequest implements Serializable { private static final long serialVersionUID = -6586877187990445986L; + + /** jobId to identify which job send the request */ + private final JobID jobID; + + /** allocationId to identify slot allocation, created by JobManager when requesting a sot */ + private final AllocationID allocationID; + + /** the resource profile of the desired slot */ + private final ResourceProfile profile; + + public SlotRequest(JobID jobID, AllocationID allocationID) { + this(jobID, allocationID, null); + } + + public SlotRequest(JobID jobID, AllocationID allocationID, ResourceProfile profile) { + this.jobID = checkNotNull(jobID, "jobID cannot be null"); + this.allocationID = checkNotNull(allocationID, "allocationID cannot be null"); + this.profile = checkNotNull(profile, "profile cannot be null"); + } + + public ResourceProfile getProfile() { + return profile; + } + + public AllocationID getAllocationID() { + return allocationID; + } + + public JobID getJobID() { + return jobID; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlotRequest that = (SlotRequest) o; + + if (!jobID.equals(that.jobID)) { + return false; + } + if (!allocationID.equals(that.allocationID)) { + return false; + } + return profile.equals(that.profile); + + } + + @Override + public int hashCode() { + int result = jobID.hashCode(); + result = 31 * result + allocationID.hashCode(); + result = 31 * result + profile.hashCode(); + return result; --- End diff -- Can be simplified by `return Objects.hash(jobID, allocationID, profile);`. > Implement communication from ResourceManager to TaskManager > ----------------------------------------------------------- > > Key: FLINK-4348 > URL: https://issues.apache.org/jira/browse/FLINK-4348 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Kurt Young > Assignee: zhangjing > > There are mainly 3 logics initiated from RM to TM: > * Heartbeat, RM use heartbeat to sync with TM's slot status > * SlotRequest, when RM decides to assign slot to JM, should first try to send > request to TM for slot. TM can either accept or reject this request. > * FailureNotify, in some corner cases, TM will be marked as invalid by > cluster manager master(e.g. yarn master), but TM itself does not realize. RM > should send failure notify to TM and TM can terminate itself -- This message was sent by Atlassian JIRA (v6.3.4#6332)