vjagadish1989 commented on a change in pull request #952: Improved standby-aware container allocation for active-containers on job redeploys URL: https://github.com/apache/samza/pull/952#discussion_r267636464
########## File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java ########## @@ -161,54 +161,72 @@ private void handleStandbyContainerStop(String standbyContainerID, String resour } } - /** Method to handle failover for an active container. - * We try to find a standby for the active container, and issue a stop on it. - * If we do not find a standby container, we simply issue an anyhost request to place it. + /** Method to handle standby-aware allocation for an active container. + * We try to find a standby host for the active container, and issue a stop on any standby-containers running on it, + * request resource to place the active on the standby's host, and one to place the standby elsewhere. * - * @param containerID the samzaContainerID of the active-container + * @param activeContainerID the samzaContainerID of the active-container * @param resourceID the samza-resource-ID of the container when it failed (used to index failover-state) */ - private void initiateActiveContainerFailover(String containerID, String resourceID, + private void initiateStandbyAwareAllocation(String activeContainerID, String resourceID, AbstractContainerAllocator containerAllocator) { - Optional<Entry<String, SamzaResource>> standbyContainer = this.selectStandby(containerID, resourceID); + String standbyHost = this.selectStandbyHost(activeContainerID, resourceID); - // If we find a standbyContainer, we initiate a failover - if (standbyContainer.isPresent()) { + // Check if there is a running standby-container on that host that needs to be stopped + List<String> standbyContainers = this.standbyContainerConstraints.get(activeContainerID); + Map<String, SamzaResource> runningStandbyContainersOnHost = this.samzaApplicationState.runningContainers.entrySet().stream().filter(x -> standbyContainers.contains(x.getKey())) + .filter(x -> x.getValue().getHost().equals(standbyHost)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - String standbyContainerId = standbyContainer.get().getKey(); - SamzaResource standbyResource = standbyContainer.get().getValue(); - String standbyResourceID = standbyResource.getResourceID(); - String standbyHost = standbyResource.getHost(); + // if the standbyHost returned is anyhost, we proceed with the request directly + if (standbyHost.equals(ResourceRequestState.ANY_HOST)) { + log.info("No standby container found for active container {}, making a resource-request for placing {} on {}", activeContainerID, activeContainerID, ResourceRequestState.ANY_HOST); + samzaApplicationState.failoversToAnyHost.incrementAndGet(); + containerAllocator.requestResource(activeContainerID, ResourceRequestState.ANY_HOST); + + } else if (runningStandbyContainersOnHost.isEmpty()) { + // if there are no running standby-containers on the standbyHost, we proceed to directly to make a resource request - // update the state - FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(containerID, resourceID); - failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost); + log.info("No running standby container to stop on host {}, making a resource-request for placing {} on {}", standbyHost, activeContainerID, standbyHost); + FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID); - log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, " - + "for active container {}", standbyContainerId, standbyResourceID, containerID); + // record the resource request, before issuing it to avoid race with allocation-thread + SamzaResourceRequest resourceRequestForActive = containerAllocator.getResourceRequest(activeContainerID, standbyHost); + failoverMetadata.recordResourceRequest(resourceRequestForActive); + containerAllocator.issueResourceRequest(resourceRequestForActive); samzaApplicationState.failoversToStandby.incrementAndGet(); - this.clusterResourceManager.stopStreamProcessor(standbyResource); } else { - // If we dont find a standbyContainer, we proceed with the ANYHOST request - log.info("No standby container found for active container {}, making a request for {}", containerID, - ResourceRequestState.ANY_HOST); - samzaApplicationState.failoversToAnyHost.incrementAndGet(); - containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); + // if there is a running standby-containers on the standbyHost, we issue a stop (the stopComplete callback completes the remainder of the flow) + FailoverMetadata failoverMetadata = this.registerActiveContainerFailure(activeContainerID, resourceID); + + runningStandbyContainersOnHost.forEach((standbyContainerID, standbyResource) -> { + log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, " + + "for active container {}", runningStandbyContainersOnHost.keySet(), runningStandbyContainersOnHost.values(), activeContainerID); + failoverMetadata.updateStandbyContainer(standbyResource.getResourceID(), standbyResource.getHost()); + samzaApplicationState.failoversToStandby.incrementAndGet(); + this.clusterResourceManager.stopStreamProcessor(standbyResource); + }); + + if (runningStandbyContainersOnHost.size() > 1) + log.error("Multiple standby containers found running on one host: {}", runningStandbyContainersOnHost); } } /** - * Method to select a standby container for a given active container that has stopped. - * TODO: enrich this method to select standby's intelligently based on lag, timestamp, load-balencing, etc. + * Method to select a standby host for a given active container. + * 1. We first try to select a host which has a running standby-container, that we haven't already selected for failover. + * 2. If we dont any such host, we iterate over last-known standbyHosts, if we haven't already selected it for failover. + * 3. If still dont find a host, we fall back to AnyHost. + * + * TODO: In Step 2 & 3, enrich the functionality to select standbyHost intelligently based on lag, timestamp, load-balancing, etc. Review comment: Some of these ideas like load aware-standby allocation warrant their own design. Instead of documenting them here, I would nuke this TODO and track them in a JIRA for better discoverability ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services