vjagadish1989 commented on a change in pull request #903: SEP-19: Allocator 
changes for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r256081802
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 ##########
 @@ -119,4 +153,219 @@ private void updateExpiryMetrics(SamzaResourceRequest 
request) {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
   }
-}
+
+  // Method to run a container on the given resource if it meets all standby 
constraints. If not, we re-request resource
+  // for the container (similar to the case when we re-request for a 
launch-fail or request expiry).
+  private boolean 
checkStandbyTaskConstraintsAndRunStreamProcessor(SamzaResourceRequest request, 
String preferredHost,
+      SamzaResource samzaResource, SamzaApplicationState state) {
+
+    // If standby tasks are not enabled run streamprocessor and return true
+    if (!new JobConfig(config).getStandbyTasksEnabled()) {
+      runStreamProcessor(request, preferredHost);
+      return true;
+    }
+
+    String containerID = request.getContainerID();
+
+    if (checkStandbyConstraints(request, samzaResource, state)) {
+      // This resource can be used to launch this container
+      log.info("Running container {} on preferred host {} meets standby 
constraints, launching on {}", containerID,
+          preferredHost, samzaResource.getHost());
+      runStreamProcessor(request, preferredHost);
+      state.successfulStandbyAllocations.incrementAndGet();
+      return true;
+    } else {
+      // This resource cannot be used to launch this container, so we treat it 
like a launch fail, and issue an ANY_HOST request
+      log.info("Running container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
+          containerID, samzaResource.getHost());
+      resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
+      resourceRequestState.cancelResourceRequest(request);
+      requestResourceDueToLaunchFailOrExpiredRequest(containerID);
+      state.failedStandbyAllocations.incrementAndGet();
+      return false;
+    }
+  }
+
+  // Helper method to check if this SamzaResourceRequest for a container can 
be met on this resource, given standby
+  // container constraints, and the current set of pending and running 
containers
+  private boolean checkStandbyConstraints(SamzaResourceRequest request, 
SamzaResource samzaResource,
+      SamzaApplicationState samzaApplicationState) {
+    String containerIDToStart = request.getContainerID();
+    String host = samzaResource.getHost();
+    List<String> containerIDsForStandbyConstraints = 
this.standbyContainerConstraints.get(containerIDToStart);
+
+    // Check if any of these conflicting containers are running/launching on 
host
+    for (String containerID : containerIDsForStandbyConstraints) {
+      SamzaResource resource = 
samzaApplicationState.pendingContainers.get(containerID);
+
+      // return false if a conflicting container is pending for launch on the 
host
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+
+      // return false if a conflicting container is running on the host
+      resource = samzaApplicationState.runningContainers.get(containerID);
+      if (resource != null && resource.getHost().equals(host)) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
+            containerIDToStart, samzaResource.getHost(), containerID);
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Intercept resource requests, which are due to either a launch-failure or 
resource-request expired or standby
+   * 1. a standby container, we proceed to make a anyhost request
+   * 2. an activeContainer, we try to fail-it-over to a standby
+   * @param containerID Identifier of the container that will be run when a 
resource is allocated
+   */
+  @Override
+  public void requestResourceDueToLaunchFailOrExpiredRequest(String 
containerID) {
 
 Review comment:
   shouldn't have to override this method  (or expose this method in the parent 
class). Instead it's cleaner to expose the most general API and special-case 
them when they're used

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to