Github user tillrohrmann commented on a diff in the pull request:
    --- Diff: 
    @@ -0,0 +1,197 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.mesos.util;
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Set;
    +import static org.apache.flink.mesos.Utils.print;
    + * An allocation of resources on a particular host from one or more Mesos 
offers, to be portioned out to tasks.
    + *
    + * <p>This class assumes that the resources were offered <b>without</b> 
the {@code RESERVATION_REFINEMENT} capability,
    + * as detailed in the "Resource Format" section of the Mesos protocol 
    + *
    + * <p>This class is not thread-safe.
    + */
    +public class MesosResourceAllocation {
    +   protected static final Logger LOG = 
    +   static final double EPSILON = 1e-5;
    +   static final String UNRESERVED_ROLE = "*";
    +   private final List<Protos.Resource> resources;
    +   /**
    +    * Creates an allocation of resources for tasks to take.
    +    *
    +    * @param resources the resources to add to the allocation.
    +    */
    +   public MesosResourceAllocation(Collection<Protos.Resource> resources) {
    +           this.resources = new ArrayList<>(resources);
    +           // sort the resources to prefer reserved resources
    +           this.resources.sort(Comparator.comparing(r -> 
    +   }
    +   /**
    +    * Gets the remaining resources.
    +    */
    +   public List<Protos.Resource> getRemaining() {
    +           return Collections.unmodifiableList(resources);
    +   }
    +   /**
    +    * Takes some amount of scalar resources (e.g. cpus, mem).
    +    *
    +    * @param amount the (approximate) amount to take from the available 
    +    * @param roles the roles to accept
    +    */
    +   public List<Protos.Resource> takeScalar(String resourceName, double 
amount, Set<String> roles) {
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Allocating {} {}", amount, resourceName);
    +           }
    +           List<Protos.Resource> result = new ArrayList<>(1);
    +           for (ListIterator<Protos.Resource> i = 
resources.listIterator(); i.hasNext();) {
    +                   if (amount <= EPSILON) {
    +                           break;
    +                   }
    +                   // take from next available scalar resource that is 
unreserved or reserved for an applicable role
    +                   Protos.Resource available =;
    +                   if (!resourceName.equals(available.getName()) || 
!available.hasScalar()) {
    +                           continue;
    +                   }
    +                   if (!UNRESERVED_ROLE.equals(available.getRole()) && 
!roles.contains(available.getRole())) {
    +                           continue;
    +                   }
    +                   double amountToTake = 
Math.min(available.getScalar().getValue(), amount);
    +                   Protos.Resource taken = 
    +                   amount -= amountToTake;
    +                   result.add(taken);
    +                   if (LOG.isDebugEnabled()) {
    +                           LOG.debug("Taking {} from {}", amountToTake, 
    +                   }
    +                   // keep remaining amount (if any)
    +                   double remaining = available.getScalar().getValue() - 
    +                   if (remaining > EPSILON) {
    +                   }
    +                   else {
    +                           i.remove();
    +                   }
    +           }
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Allocated: {}, unsatisfied: {}", 
print(result), amount);
    +           }
    +           return result;
    +   }
    +   /**
    +    * Takes some amount of range resources (e.g. ports).
    +    *
    +    * @param amount the number of values to take from the available 
    +    * @param roles the roles to accept
    +    */
    +   public List<Protos.Resource> takeRanges(String resourceName, int 
amount, Set<String> roles) {
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Allocating {} {}", amount, resourceName);
    +           }
    +           List<Protos.Resource> result = new ArrayList<>(1);
    +           for (ListIterator<Protos.Resource> i = 
resources.listIterator(); i.hasNext();) {
    +                   if (amount <= 0) {
    +                           break;
    +                   }
    +                   // take from next available range resource that is 
unreserved or reserved for an applicable role
    +                   Protos.Resource available =;
    +                   if (!resourceName.equals(available.getName()) || 
!available.hasRanges()) {
    +                           continue;
    +                   }
    +                   if (!UNRESERVED_ROLE.equals(available.getRole()) && 
!roles.contains(available.getRole())) {
    +                           continue;
    +                   }
    +                   List<Protos.Value.Range> takenRanges = new 
    +                   List<Protos.Value.Range> remainingRanges = new 
    +                   for (ListIterator<Protos.Value.Range> j = 
remainingRanges.listIterator(); j.hasNext();) {
    +                           if (amount <= 0) {
    +                                   break;
    +                           }
    +                           // take from next available range (note: ranges 
are inclusive)
    +                           Protos.Value.Range availableRange =;
    +                           long amountToTake = 
Math.min(availableRange.getEnd() - availableRange.getBegin() + 1, amount);
    +                           Protos.Value.Range takenRange = 
availableRange.toBuilder().setEnd(availableRange.getBegin() + amountToTake - 
    +                           amount -= amountToTake;
    +                           takenRanges.add(takenRange);
    +                           // keep remaining range (if any)
    +                           long remaining = availableRange.getEnd() - 
    +                           if (remaining > 0) {
j.set(availableRange.toBuilder().setBegin(availableRange.getEnd() - remaining + 
    --- End diff --
    I think this expression could be simplified to `takenRange.getEnd() + 1`.


Reply via email to