vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256083123
##########
File path:
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
##########
@@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest
request) {
state.expiredPreferredHostRequests.incrementAndGet();
}
}
-}
+
+ // Method to run a container on the given resource if it meets all standby
constraints. If not, we re-request resource
+ // for the container (similar to the case when we re-request for a
launch-fail or request expiry).
+ private boolean
checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request,
String preferredHost,
+ SamzaResource samzaResource, SamzaApplicationState state) {
+
+ // If standby tasks are not enabled run streamprocessor and return true
+ if (!new JobConfig(config).getStandbyTasksEnabled()) {
+ runStreamProcessor(request, preferredHost);
+ return true;
+ }
+
+ String containerID = request.getContainerID();
+
+ if (checkStandbyConstraints(request, samzaResource, state)) {
+ // This resource can be used to launch this container
+ log.info("Running container {} on preferred host {} meets standby
constraints, launching on {}", containerID,
+ preferredHost, samzaResource.getHost());
+ runStreamProcessor(request, preferredHost);
+ state.successfulStandbyAllocations.incrementAndGet();
+ return true;
+ } else {
+ // This resource cannot be used to launch this container, so we treat it
like a launch fail, and issue an ANY_HOST request
+ log.info("Running container {} on host {} does not meet standby
constraints, cancelling resource request, releasing resource, and making a new
ANY_HOST request",
+ containerID, samzaResource.getHost());
+ resourceRequestState.releaseUnstartableContainer(samzaResource,
preferredHost);
+ resourceRequestState.cancelResourceRequest(request);
+ requestResourceDueToLaunchFailOrExpiredRequest(containerID);
+ state.failedStandbyAllocations.incrementAndGet();
+ return false;
+ }
+ }
+
+ // Helper method to check if this SamzaResourceRequest for a container can
be met on this resource, given standby
+ // container constraints, and the current set of pending and running
containers
+ private boolean checkStandbyConstraints(SamzaResourceRequest request,
SamzaResource samzaResource,
+ SamzaApplicationState samzaApplicationState) {
+ String containerIDToStart = request.getContainerID();
+ String host = samzaResource.getHost();
+ List<String> containerIDsForStandbyConstraints =
this.standbyContainerConstraints.get(containerIDToStart);
+
+ // Check if any of these conflicting containers are running/launching on
host
+ for (String containerID : containerIDsForStandbyConstraints) {
+ SamzaResource resource =
samzaApplicationState.pendingContainers.get(containerID);
+
+ // return false if a conflicting container is pending for launch on the
host
+ if (resource != null && resource.getHost().equals(host)) {
+ log.info("Container {} cannot be started on host {} because container
{} is already scheduled on this host",
+ containerIDToStart, samzaResource.getHost(), containerID);
+ return false;
+ }
+
+ // return false if a conflicting container is running on the host
+ resource = samzaApplicationState.runningContainers.get(containerID);
+ if (resource != null && resource.getHost().equals(host)) {
+ log.info("Container {} cannot be started on host {} because container
{} is already running on this host",
+ containerIDToStart, samzaResource.getHost(), containerID);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Intercept resource requests, which are due to either a launch-failure or
resource-request expired or standby
+ * 1. a standby container, we proceed to make a anyhost request
+ * 2. an activeContainer, we try to fail-it-over to a standby
+ * @param containerID Identifier of the container that will be run when a
resource is allocated
+ */
+ @Override
+ public void requestResourceDueToLaunchFailOrExpiredRequest(String
containerID) {
+ if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+ log.info("Handling rerequesting for container {} using an any host
request");
+ super.requestResource(containerID, ResourceRequestState.ANY_HOST); //
proceed with a the anyhost request
+ } else {
+ requestResource(containerID, ResourceRequestState.ANY_HOST); // invoke
local method & select a new standby if possible
+ }
+ }
+
+ // Intercept issuing of resource requests from the CPM
+ // 1. for ActiveContainers and instead choose a StandbyContainer to stop
+ // 2. for a StandbyContainer (after it has been chosen for failover), to put
the active on the standby's host
+ // and request another resource for the standby
+ // 3. for a standbyContainer (if not for a failover)
+ @Override
+ public void requestResource(String containerID, String preferredHost) {
+
+ // If StandbyTasks are not enabled, we simply forward the resource requests
+ if (!new JobConfig(config).getStandbyTasksEnabled()) {
+ super.requestResource(containerID, preferredHost);
+ return;
+ }
+
+ // If its an anyhost request for an active container, then we select a
standby container to stop and place this activeContainer on that standby's host
+ // we may have already chosen a standby (which didnt work for a failover)
+ if (!StandbyTaskUtil.isStandbyContainer(containerID) &&
preferredHost.equals(ResourceRequestState.ANY_HOST)) {
+ initiateActiveContainerFailover(containerID);
+ } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
Review comment:
move failover handling logic up-stream - maybe to the
`ContainerProcessManager`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services