[ https://issues.apache.org/jira/browse/FLINK-4348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15431928#comment-15431928 ]
ASF GitHub Bot commented on FLINK-4348: --------------------------------------- Github user beyond1920 commented on a diff in the pull request: https://github.com/apache/flink/pull/2389#discussion_r75789222 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java --- @@ -94,26 +130,218 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public AcknowledgeSlotRequest requestSlot(SlotRequest slotRequest) { + // TODO slotManager.requestSlot(slotRequest); + return new AcknowledgeSlotRequest(slotRequest.getAllocationID()); } /** - * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers * * @return The response by the ResourceManager. */ @RpcMethod - public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor( + final UUID resourceManagerLeaderId, + final String taskExecutorAddress, + final ResourceID resourceID + ) { + log.info("received register from taskExecutor {}, address {}", resourceID, taskExecutorAddress); + Future<TaskExecutorGateway> taskExecutorFuture = + getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); + + return taskExecutorFuture.map(new Mapper<TaskExecutorGateway, org.apache.flink.runtime.rpc.registration.RegistrationResponse>() { + @Override + public org.apache.flink.runtime.rpc.registration.RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) { + // decline registration if resourceManager cannot connect to the taskExecutor using the given address + if(taskExecutorGateway == null) { --- End diff -- getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); now if the address is invalid, then taskExecutorGateway is null. > Implement communication from ResourceManager to TaskManager > ----------------------------------------------------------- > > Key: FLINK-4348 > URL: https://issues.apache.org/jira/browse/FLINK-4348 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management > Reporter: Kurt Young > Assignee: zhangjing > > There are mainly 3 logics initiated from RM to TM: > * Heartbeat, RM use heartbeat to sync with TM's slot status > * SlotRequest, when RM decides to assign slot to JM, should first try to send > request to TM for slot. TM can either accept or reject this request. > * FailureNotify, in some corner cases, TM will be marked as invalid by > cluster manager master(e.g. yarn master), but TM itself does not realize. RM > should send failure notify to TM and TM can terminate itself -- This message was sent by Atlassian JIRA (v6.3.4#6332)