vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r256082808
########## File path: samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ########## @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest request) { state.expiredPreferredHostRequests.incrementAndGet(); } } -} + + // Method to run a container on the given resource if it meets all standby constraints. If not, we re-request resource + // for the container (similar to the case when we re-request for a launch-fail or request expiry). + private boolean checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost, + SamzaResource samzaResource, SamzaApplicationState state) { + + // If standby tasks are not enabled run streamprocessor and return true + if (!new JobConfig(config).getStandbyTasksEnabled()) { + runStreamProcessor(request, preferredHost); + return true; + } + + String containerID = request.getContainerID(); + + if (checkStandbyConstraints(request, samzaResource, state)) { + // This resource can be used to launch this container + log.info("Running container {} on preferred host {} meets standby constraints, launching on {}", containerID, + preferredHost, samzaResource.getHost()); + runStreamProcessor(request, preferredHost); + state.successfulStandbyAllocations.incrementAndGet(); + return true; + } else { + // This resource cannot be used to launch this container, so we treat it like a launch fail, and issue an ANY_HOST request + log.info("Running container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request", + containerID, samzaResource.getHost()); + resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost); + resourceRequestState.cancelResourceRequest(request); + requestResourceDueToLaunchFailOrExpiredRequest(containerID); + state.failedStandbyAllocations.incrementAndGet(); + return false; + } + } + + // Helper method to check if this SamzaResourceRequest for a container can be met on this resource, given standby + // container constraints, and the current set of pending and running containers + private boolean checkStandbyConstraints(SamzaResourceRequest request, SamzaResource samzaResource, + SamzaApplicationState samzaApplicationState) { + String containerIDToStart = request.getContainerID(); + String host = samzaResource.getHost(); + List<String> containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIDToStart); + + // Check if any of these conflicting containers are running/launching on host + for (String containerID : containerIDsForStandbyConstraints) { + SamzaResource resource = samzaApplicationState.pendingContainers.get(containerID); + + // return false if a conflicting container is pending for launch on the host + if (resource != null && resource.getHost().equals(host)) { + log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host", + containerIDToStart, samzaResource.getHost(), containerID); + return false; + } + + // return false if a conflicting container is running on the host + resource = samzaApplicationState.runningContainers.get(containerID); + if (resource != null && resource.getHost().equals(host)) { + log.info("Container {} cannot be started on host {} because container {} is already running on this host", + containerIDToStart, samzaResource.getHost(), containerID); + return false; + } + } + + return true; + } + + /** + * Intercept resource requests, which are due to either a launch-failure or resource-request expired or standby + * 1. a standby container, we proceed to make a anyhost request + * 2. an activeContainer, we try to fail-it-over to a standby + * @param containerID Identifier of the container that will be run when a resource is allocated + */ + @Override + public void requestResourceDueToLaunchFailOrExpiredRequest(String containerID) { + if (StandbyTaskUtil.isStandbyContainer(containerID)) { + log.info("Handling rerequesting for container {} using an any host request"); + super.requestResource(containerID, ResourceRequestState.ANY_HOST); // proceed with a the anyhost request + } else { + requestResource(containerID, ResourceRequestState.ANY_HOST); // invoke local method & select a new standby if possible + } + } + + // Intercept issuing of resource requests from the CPM + // 1. for ActiveContainers and instead choose a StandbyContainer to stop + // 2. for a StandbyContainer (after it has been chosen for failover), to put the active on the standby's host + // and request another resource for the standby + // 3. for a standbyContainer (if not for a failover) + @Override Review comment: would be preferable to not override `requestResource`..Additionally, `requestResource` should not perform any actions related to failover handling. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services