rmatharu commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r260926132
########## File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java ########## @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.storage.kv.Entry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Encapsulates logic and state concerning standby-containers. + */ +public class StandbyContainerManager { + + private static final Logger log = LoggerFactory.getLogger(StandbyContainerManager.class); + + private final SamzaApplicationState samzaApplicationState; + + // Map of samza containerIDs to their corresponding active and standby containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1} + // This is used for checking no two standbys or active-standby-pair are started on the same host + private final Map<String, List<String>> standbyContainerConstraints; + + // Map of active containers that are in failover, indexed by the active container's resourceID (at the time of failure) + private final Map<String, FailoverMetadata> failovers; + + // Resource-manager, used to stop containers + private ClusterResourceManager clusterResourceManager; + + public StandbyContainerManager(SamzaApplicationState samzaApplicationState, + ClusterResourceManager clusterResourceManager) { + this.failovers = new ConcurrentHashMap<>(); + this.standbyContainerConstraints = new HashMap<>(); + this.samzaApplicationState = samzaApplicationState; + JobModel jobModel = samzaApplicationState.jobModelManager.jobModel(); + + // populate the standbyContainerConstraints map by iterating over all containers + jobModel.getContainers() + .keySet() + .forEach(containerId -> standbyContainerConstraints.put(containerId, + StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel))); + this.clusterResourceManager = clusterResourceManager; + + log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints); + } + + /** + * We handle the stopping of a container depending on the case which is decided using the exit-status: + * Case 1. an Active-Container which has stopped for an "unknown" reason, then we start it on the given preferredHost + * Case 2. Active container has stopped because of node failure, thene we initiate a failover + * Case 3. StandbyContainer has stopped after it was chosen for failover, see {@link StandbyContainerManager#handleStandbyContainerStop} + * Case 4. StandbyContainer has stopped but not because of a failover, see {@link StandbyContainerManager#handleStandbyContainerStop} + * + * @param containerID containerID of the stopped container + * @param resourceID last resourceID of the stopped container + * @param preferredHost the host on which the container was running + * @param exitStatus the exit status of the failed container + * @param containerAllocator the container allocator + */ + public void handleContainerStop(String containerID, String resourceID, String preferredHost, int exitStatus, + AbstractContainerAllocator containerAllocator) { + + if (StandbyTaskUtil.isStandbyContainer(containerID)) { + handleStandbyContainerStop(containerID, resourceID, preferredHost, containerAllocator); + } else { + // initiate failover for the active container based on the exitStatus + switch (exitStatus) { + case SamzaResourceStatus.DISK_FAIL: + case SamzaResourceStatus.ABORTED: + case SamzaResourceStatus.PREEMPTED: + initiateActiveContainerFailover(containerID, resourceID, containerAllocator); + break; + // in all other cases, request resource for the failed container + default: + log.info("Requesting resource for active-container {} on host {}", containerID, preferredHost); + containerAllocator.requestResource(containerID, preferredHost); + break; + } + } + } + + /** + * Handle the failed launch of a container, based on + * Case 1. If it is an active container, then initiate a failover for it. + * Case 2. If it is standby container, request a new resource on AnyHost. + * @param containerID the ID of the container that has failed + */ + public void handleContainerLaunchFail(String containerID, String resourceID, + AbstractContainerAllocator containerAllocator) { + + if (StandbyTaskUtil.isStandbyContainer(containerID)) { + log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID); + containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); + } else { + initiateActiveContainerFailover(containerID, resourceID, containerAllocator); + } + } + + /** + * If a standby container has stopped, then there are two possible cases + * Case 1. during a failover, the standby container was stopped for an active's start, then we + * 1. request a resource on the standby's host to place the activeContainer, and + * 2. request anyhost to place this standby + * + * Case 2. independent of a failover, the standby container stopped, in which proceed with its resource-request + * @param standbyContainerID SamzaContainerID of the standby container + * @param preferredHost Preferred host of the standby container + */ + private void handleStandbyContainerStop(String standbyContainerID, String resourceID, String preferredHost, + AbstractContainerAllocator containerAllocator) { + + // if this standbyContainerResource was stopped for a failover, we will find a metadata entry + Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID); + + if (failoverMetadata.isPresent()) { + String activeContainerID = failoverMetadata.get().activeContainerID; + String standbyContainerHostname = failoverMetadata.get().getStandbyContainerHostname(resourceID); + + log.info("Requesting resource for active container {} on host {}, and backup container {} on any host", + activeContainerID, standbyContainerHostname, standbyContainerID); + + containerAllocator.requestResource(activeContainerID, + standbyContainerHostname); // request standbycontainer's host for active-container + containerAllocator.requestResource(standbyContainerID, + ResourceRequestState.ANY_HOST); // request anyhost for standby container + } else { + log.info("Issuing request for standby container {} on host {}, since this is not for a failover", + standbyContainerID, preferredHost); + containerAllocator.requestResource(standbyContainerID, preferredHost); + } + } + + /** Method to handle failover for an active container. + * We try to find a standby for the active container, and issue a stop on it. + * If we do not find a standby container, we simply issue an anyhost request to place it. + * + * @param containerID the samzaContainerID of the active-container + * @param resourceID the samza-resource-ID of the container when it failed (used to index failover-state) + */ + private void initiateActiveContainerFailover(String containerID, String resourceID, + AbstractContainerAllocator containerAllocator) { + Optional<Entry<String, SamzaResource>> standbyContainer = this.selectStandby(containerID, resourceID); + + // If we find a standbyContainer, we initiate a failover + if (standbyContainer.isPresent()) { + + // ResourceID of the active container at the time of its last failure + String standbyContainerId = standbyContainer.get().getKey(); + SamzaResource standbyResource = standbyContainer.get().getValue(); + String standbyResourceID = standbyResource.getResourceID(); + String standbyHost = standbyResource.getHost(); + + // update the failover state + this.registerFailover(containerID, resourceID, standbyResourceID, standbyHost); + log.info("Initiating failover and stopping standby container, found standbyContainer {} = resource {}, " + + "for active container {}", standbyContainerId, standbyResourceID, containerID); + samzaApplicationState.failoversToStandby.incrementAndGet(); + + this.clusterResourceManager.stopStreamProcessor(standbyResource); + } else { + + // If we dont find a standbyContainer, we proceed with the ANYHOST request + log.info("No standby container found for active container {}, making a request for {}", containerID, + ResourceRequestState.ANY_HOST); + samzaApplicationState.failoversToAnyHost.incrementAndGet(); + containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); + } + } + + /** + * Method to select a standby container for a given active container that has stopped. + * TODO: enrich this method to select standby's intelligently based on lag, timestamp, load-balencing, etc. + * @param activeContainerID Samza containerID of the active container + * @param activeContainerResourceID ResourceID of the active container at the time of its last failure + * @return + */ + private Optional<Entry<String, SamzaResource>> selectStandby(String activeContainerID, + String activeContainerResourceID) { + + log.info("Standby containers {} for active container {}", this.standbyContainerConstraints.get(activeContainerID), activeContainerID); + + // obtain any existing failover metadata + Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = + activeContainerResourceID == null ? Optional.empty() : this.getFailoverMetadata(activeContainerResourceID); + + // Iterate over the list of running standby containers, to find a standby resource that we have not already + // used for a failover for this active resoruce + for (String standbyContainerID : this.standbyContainerConstraints.get(activeContainerID)) { + + if (samzaApplicationState.runningContainers.containsKey(standbyContainerID)) { + SamzaResource standbyContainerResource = samzaApplicationState.runningContainers.get(standbyContainerID); + + // use this standby if there was no previous failover for which this standbyResource was used + if (!(failoverMetadata.isPresent() && failoverMetadata.get().isStandbyResourceUsed(standbyContainerResource.getResourceID()))) { + + log.info("Returning standby container {} in running state for active container {}", standbyContainerID, + activeContainerID); + return Optional.of(new Entry<>(standbyContainerID, standbyContainerResource)); + } + } + } + + log.info("Did not find any running standby container for active container {}", activeContainerID); + return Optional.empty(); + } + + /** + * Register a new failover that has been initiated for the active container resource (identified by its resource ID). + */ + private void registerFailover(String activeContainerID, String activeContainerResourceID, + String selectedStandbyContainerResourceID, String standbyContainerHost) { + + // this active container's resource ID is already registered, in which case update the metadata + if (failovers.containsKey(activeContainerResourceID)) { + FailoverMetadata failoverMetadata = failovers.get(activeContainerResourceID); + failoverMetadata.updateStandbyContainer(selectedStandbyContainerResourceID, standbyContainerHost); + } else { + FailoverMetadata failoverMetadata = + new FailoverMetadata(activeContainerID, activeContainerResourceID, selectedStandbyContainerResourceID, + standbyContainerHost); + this.failovers.put(activeContainerResourceID, failoverMetadata); + } + } + + /** + * Check if this standbyContainerResource is present in the failoverState for an active container. + * This is used to determine if we requested a stop a container. + */ + private Optional<FailoverMetadata> checkIfUsedForFailover(String standbyContainerResourceId) { + + if (standbyContainerResourceId == null) { + return Optional.empty(); + } + + for (FailoverMetadata failoverMetadata : failovers.values()) { + if (failoverMetadata.isStandbyResourceUsed(standbyContainerResourceId)) { + log.info("Standby container with resource id {} was selected for failover of active container {}", + standbyContainerResourceId, failoverMetadata.activeContainerID); + return Optional.of(failoverMetadata); + } + } + return Optional.empty(); + } + + /** + * Check if matching this SamzaResourceRequest to the given resource, meets all standby-container container constraints. + * + * @param request The resource request to match. + * @param samzaResource The samzaResource to potentially match the resource to. + * @return + */ + private boolean checkStandbyConstraints(SamzaResourceRequest request, SamzaResource samzaResource) { + String containerIDToStart = request.getContainerID(); + String host = samzaResource.getHost(); + List<String> containerIDsForStandbyConstraints = this.standbyContainerConstraints.get(containerIDToStart); + + // Check if any of these conflicting containers are running/launching on host + for (String containerID : containerIDsForStandbyConstraints) { + SamzaResource resource = samzaApplicationState.pendingContainers.get(containerID); + + // return false if a conflicting container is pending for launch on the host + if (resource != null && resource.getHost().equals(host)) { + log.info("Container {} cannot be started on host {} because container {} is already scheduled on this host", + containerIDToStart, samzaResource.getHost(), containerID); + return false; + } + + // return false if a conflicting container is running on the host + resource = samzaApplicationState.runningContainers.get(containerID); + if (resource != null && resource.getHost().equals(host)) { + log.info("Container {} cannot be started on host {} because container {} is already running on this host", + containerIDToStart, samzaResource.getHost(), containerID); + return false; + } + } + + return true; + } + + /** + * Attempt to the run a container on the given candidate resource, if doing so meets the standby container constraints. + * @param request The Samza container request + * @param preferredHost the preferred host associated with the container + * @param samzaResource the resource candidate + */ + public void checkStandbyConstraintsAndRunStreamProcessor(SamzaResourceRequest request, String preferredHost, + SamzaResource samzaResource, AbstractContainerAllocator containerAllocator, + ResourceRequestState resourceRequestState) { + String containerID = request.getContainerID(); + + if (checkStandbyConstraints(request, samzaResource)) { + // This resource can be used to launch this container + log.info("Running container {} on {} meets standby constraints, preferredHost = {}", containerID, samzaResource.getHost(), preferredHost); + containerAllocator.runStreamProcessor(request, preferredHost); + samzaApplicationState.successfulStandbyAllocations.incrementAndGet(); + + } else if (StandbyTaskUtil.isStandbyContainer(containerID)) { + // This resource cannot be used to launch this standby container, so we make a new anyhost request + log.info("Running standby container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource, and making a new ANY_HOST request", + containerID, samzaResource.getHost()); + resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost); + resourceRequestState.cancelResourceRequest(request); + containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); + samzaApplicationState.failedStandbyAllocations.incrementAndGet(); + } else { + // This resource cannot be used to launch this active container container, so we initiate a failover + log.warn("Running active container {} on host {} does not meet standby constraints, cancelling resource request, releasing resource", + containerID, samzaResource.getHost()); + resourceRequestState.releaseUnstartableContainer(samzaResource, preferredHost); + resourceRequestState.cancelResourceRequest(request); + + // if this active-container has never failed, then simple request anyhost + if (!samzaApplicationState.failedContainersStatus.containsKey(containerID)) { + log.info("Requesting ANY_HOST for active container {}", containerID); + containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); + } else { + log.info("Initiating failover for active container {}", containerID); + // we use the activeContainer's last known resourceID to initiate the failover + String lastKnownResourceID = samzaApplicationState.failedContainersStatus.get(containerID).getResourceID(); + initiateActiveContainerFailover(containerID, lastKnownResourceID, containerAllocator); + } + + samzaApplicationState.failedStandbyAllocations.incrementAndGet(); + } + } + + public void handleExpiredResourceRequest(String containerID, SamzaResourceRequest request, + Optional<SamzaResource> alternativeResource, AbstractContainerAllocator containerAllocator, + ResourceRequestState resourceRequestState) { + + if (StandbyTaskUtil.isStandbyContainer(containerID) && alternativeResource.isPresent()) { + // A standby container can be started on the anyhost-alternative-resource rightaway provided it passes all the + // standby constraints + + log.info("Handling expired request, standby container {} can be started on alternative resource {}", containerID, alternativeResource.get()); + + checkStandbyConstraintsAndRunStreamProcessor(request, ResourceRequestState.ANY_HOST, alternativeResource.get(), + containerAllocator, resourceRequestState); + + } else if (StandbyTaskUtil.isStandbyContainer(containerID) && !alternativeResource.isPresent()) { + // If there is no alternative-resource for the standby container we make a new anyhost request Review comment: Let me try separating it out by container-type (standby v. active) : handleExpiredRequestForActive and handleExpiredRequestForStandby. That would help i think. This overall method is basically saying a resource request has expired, and optionally an alternate anyHost resource is available. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services