rmatharu commented on a change in pull request #903: SEP-19: Allocator changes for standby-aware container allocation, and active container failover URL: https://github.com/apache/samza/pull/903#discussion_r260911377
########## File path: samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java ########## @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.storage.kv.Entry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Encapsulates logic and state concerning standby-containers. + */ +public class StandbyContainerManager { + + private static final Logger log = LoggerFactory.getLogger(StandbyContainerManager.class); + + private final SamzaApplicationState samzaApplicationState; + + // Map of samza containerIDs to their corresponding active and standby containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1} + // This is used for checking no two standbys or active-standby-pair are started on the same host + private final Map<String, List<String>> standbyContainerConstraints; + + // Map of active containers that are in failover, indexed by the active container's resourceID (at the time of failure) + private final Map<String, FailoverMetadata> failovers; + + // Resource-manager, used to stop containers + private ClusterResourceManager clusterResourceManager; + + public StandbyContainerManager(SamzaApplicationState samzaApplicationState, + ClusterResourceManager clusterResourceManager) { + this.failovers = new ConcurrentHashMap<>(); + this.standbyContainerConstraints = new HashMap<>(); + this.samzaApplicationState = samzaApplicationState; + JobModel jobModel = samzaApplicationState.jobModelManager.jobModel(); + + // populate the standbyContainerConstraints map by iterating over all containers + jobModel.getContainers() + .keySet() + .forEach(containerId -> standbyContainerConstraints.put(containerId, + StandbyTaskUtil.getStandbyContainerConstraints(containerId, jobModel))); + this.clusterResourceManager = clusterResourceManager; + + log.info("Populated standbyContainerConstraints map {}", standbyContainerConstraints); + } + + /** + * We handle the stopping of a container depending on the case which is decided using the exit-status: + * Case 1. an Active-Container which has stopped for an "unknown" reason, then we start it on the given preferredHost + * Case 2. Active container has stopped because of node failure, thene we initiate a failover + * Case 3. StandbyContainer has stopped after it was chosen for failover, see {@link StandbyContainerManager#handleStandbyContainerStop} + * Case 4. StandbyContainer has stopped but not because of a failover, see {@link StandbyContainerManager#handleStandbyContainerStop} + * + * @param containerID containerID of the stopped container + * @param resourceID last resourceID of the stopped container + * @param preferredHost the host on which the container was running + * @param exitStatus the exit status of the failed container + * @param containerAllocator the container allocator + */ + public void handleContainerStop(String containerID, String resourceID, String preferredHost, int exitStatus, + AbstractContainerAllocator containerAllocator) { + + if (StandbyTaskUtil.isStandbyContainer(containerID)) { + handleStandbyContainerStop(containerID, resourceID, preferredHost, containerAllocator); + } else { + // initiate failover for the active container based on the exitStatus + switch (exitStatus) { + case SamzaResourceStatus.DISK_FAIL: + case SamzaResourceStatus.ABORTED: + case SamzaResourceStatus.PREEMPTED: + initiateActiveContainerFailover(containerID, resourceID, containerAllocator); + break; + // in all other cases, request resource for the failed container + default: + log.info("Requesting resource for active-container {} on host {}", containerID, preferredHost); + containerAllocator.requestResource(containerID, preferredHost); + break; + } + } + } + + /** + * Handle the failed launch of a container, based on + * Case 1. If it is an active container, then initiate a failover for it. + * Case 2. If it is standby container, request a new resource on AnyHost. + * @param containerID the ID of the container that has failed + */ + public void handleContainerLaunchFail(String containerID, String resourceID, + AbstractContainerAllocator containerAllocator) { + + if (StandbyTaskUtil.isStandbyContainer(containerID)) { + log.info("Handling launch fail for standby-container {}, requesting resource on any host {}", containerID); + containerAllocator.requestResource(containerID, ResourceRequestState.ANY_HOST); + } else { + initiateActiveContainerFailover(containerID, resourceID, containerAllocator); + } + } + + /** + * If a standby container has stopped, then there are two possible cases + * Case 1. during a failover, the standby container was stopped for an active's start, then we + * 1. request a resource on the standby's host to place the activeContainer, and + * 2. request anyhost to place this standby + * + * Case 2. independent of a failover, the standby container stopped, in which proceed with its resource-request + * @param standbyContainerID SamzaContainerID of the standby container + * @param preferredHost Preferred host of the standby container + */ + private void handleStandbyContainerStop(String standbyContainerID, String resourceID, String preferredHost, + AbstractContainerAllocator containerAllocator) { + + // if this standbyContainerResource was stopped for a failover, we will find a metadata entry + Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = this.checkIfUsedForFailover(resourceID); + + if (failoverMetadata.isPresent()) { + String activeContainerID = failoverMetadata.get().activeContainerID; + String standbyContainerHostname = failoverMetadata.get().getStandbyContainerHostname(resourceID); + + log.info("Requesting resource for active container {} on host {}, and backup container {} on any host", + activeContainerID, standbyContainerHostname, standbyContainerID); + + containerAllocator.requestResource(activeContainerID, + standbyContainerHostname); // request standbycontainer's host for active-container + containerAllocator.requestResource(standbyContainerID, + ResourceRequestState.ANY_HOST); // request anyhost for standby container + return; + } else { + log.info("Issuing request for standby container {} on host {}, since this is not for a failover", + standbyContainerID, preferredHost); + containerAllocator.requestResource(standbyContainerID, preferredHost); + return; + } + } + + /** Method to handle failover for a container. + * If it is an active container, + * We try to find a standby for the active container, and issue a stop on it. + * If we do not find a standby container, we simply issue an anyhost request to place it. + * If it is standby container, we simply forward the request to the containerAllocator + * + * @param containerID the samzaContainerID of the active-container + * @param resourceID the samza-resource-ID of the container when it failed (used to index failover-state) + */ + private void initiateActiveContainerFailover(String containerID, String resourceID, Review comment: Sorry the doc for this method was incorrect. This method only handles activeContainerIDs now, so the initiateActiveContainerFailover is accurate. The "standby" case has been inlined. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services