[ 
https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278573#comment-16278573
 ] 

ASF GitHub Bot commented on FLINK-8174:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5114#discussion_r154873317
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosResourceAllocation.java
 ---
    @@ -0,0 +1,197 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.util;
    +
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.List;
    +import java.util.ListIterator;
    +import java.util.Set;
    +
    +import static org.apache.flink.mesos.Utils.print;
    +
    +/**
    + * An allocation of resources on a particular host from one or more Mesos 
offers, to be portioned out to tasks.
    + *
    + * <p>This class assumes that the resources were offered <b>without</b> 
the {@code RESERVATION_REFINEMENT} capability,
    + * as detailed in the "Resource Format" section of the Mesos protocol 
definition.
    + *
    + * <p>This class is not thread-safe.
    + */
    +public class MesosResourceAllocation {
    +
    +   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosResourceAllocation.class);
    +
    +   static final double EPSILON = 1e-5;
    +   static final String UNRESERVED_ROLE = "*";
    +
    +   private final List<Protos.Resource> resources;
    +
    +   /**
    +    * Creates an allocation of resources for tasks to take.
    +    *
    +    * @param resources the resources to add to the allocation.
    +    */
    +   public MesosResourceAllocation(Collection<Protos.Resource> resources) {
    +           this.resources = new ArrayList<>(resources);
    +
    +           // sort the resources to prefer reserved resources
    +           this.resources.sort(Comparator.comparing(r -> 
UNRESERVED_ROLE.equals(r.getRole())));
    +   }
    +
    +   /**
    +    * Gets the remaining resources.
    +    */
    +   public List<Protos.Resource> getRemaining() {
    +           return Collections.unmodifiableList(resources);
    +   }
    +
    +   /**
    +    * Takes some amount of scalar resources (e.g. cpus, mem).
    +    *
    +    * @param amount the (approximate) amount to take from the available 
quantity.
    +    * @param roles the roles to accept
    +    */
    +   public List<Protos.Resource> takeScalar(String resourceName, double 
amount, Set<String> roles) {
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Allocating {} {}", amount, resourceName);
    +           }
    +
    +           List<Protos.Resource> result = new ArrayList<>(1);
    +           for (ListIterator<Protos.Resource> i = 
resources.listIterator(); i.hasNext();) {
    +                   if (amount <= EPSILON) {
    +                           break;
    +                   }
    +
    +                   // take from next available scalar resource that is 
unreserved or reserved for an applicable role
    +                   Protos.Resource available = i.next();
    +                   if (!resourceName.equals(available.getName()) || 
!available.hasScalar()) {
    +                           continue;
    +                   }
    +                   if (!UNRESERVED_ROLE.equals(available.getRole()) && 
!roles.contains(available.getRole())) {
    +                           continue;
    +                   }
    +
    +                   double amountToTake = 
Math.min(available.getScalar().getValue(), amount);
    +                   Protos.Resource taken = 
available.toBuilder().setScalar(Protos.Value.Scalar.newBuilder().setValue(amountToTake)).build();
    +                   amount -= amountToTake;
    +                   result.add(taken);
    +                   if (LOG.isDebugEnabled()) {
    +                           LOG.debug("Taking {} from {}", amountToTake, 
print(available));
    +                   }
    +
    +                   // keep remaining amount (if any)
    +                   double remaining = available.getScalar().getValue() - 
taken.getScalar().getValue();
    +                   if (remaining > EPSILON) {
    +                           
i.set(available.toBuilder().setScalar(Protos.Value.Scalar.newBuilder().setValue(remaining)).build());
    +                   }
    +                   else {
    +                           i.remove();
    +                   }
    +           }
    +
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Allocated: {}, unsatisfied: {}", 
print(result), amount);
    +           }
    +           return result;
    +   }
    +
    +   /**
    +    * Takes some amount of range resources (e.g. ports).
    +    *
    +    * @param amount the number of values to take from the available 
range(s).
    +    * @param roles the roles to accept
    +    */
    +   public List<Protos.Resource> takeRanges(String resourceName, int 
amount, Set<String> roles) {
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Allocating {} {}", amount, resourceName);
    +           }
    +
    +           List<Protos.Resource> result = new ArrayList<>(1);
    +           for (ListIterator<Protos.Resource> i = 
resources.listIterator(); i.hasNext();) {
    +                   if (amount <= 0) {
    +                           break;
    +                   }
    +
    +                   // take from next available range resource that is 
unreserved or reserved for an applicable role
    +                   Protos.Resource available = i.next();
    +                   if (!resourceName.equals(available.getName()) || 
!available.hasRanges()) {
    +                           continue;
    +                   }
    +                   if (!UNRESERVED_ROLE.equals(available.getRole()) && 
!roles.contains(available.getRole())) {
    +                           continue;
    +                   }
    +
    +                   List<Protos.Value.Range> takenRanges = new 
ArrayList<>();
    +                   List<Protos.Value.Range> remainingRanges = new 
ArrayList<>(available.getRanges().getRangeList());
    +                   for (ListIterator<Protos.Value.Range> j = 
remainingRanges.listIterator(); j.hasNext();) {
    +                           if (amount <= 0) {
    +                                   break;
    +                           }
    +
    +                           // take from next available range (note: ranges 
are inclusive)
    +                           Protos.Value.Range availableRange = j.next();
    +                           long amountToTake = 
Math.min(availableRange.getEnd() - availableRange.getBegin() + 1, amount);
    +                           Protos.Value.Range takenRange = 
availableRange.toBuilder().setEnd(availableRange.getBegin() + amountToTake - 
1).build();
    +                           amount -= amountToTake;
    +                           takenRanges.add(takenRange);
    +
    +                           // keep remaining range (if any)
    +                           long remaining = availableRange.getEnd() - 
takenRange.getEnd();
    +                           if (remaining > 0) {
    +                                   
j.set(availableRange.toBuilder().setBegin(availableRange.getEnd() - remaining + 
1).build());
    --- End diff --
    
    I think this expression could be simplified to `takenRange.getEnd() + 1`.


> Mesos RM unable to accept offers for unreserved resources
> ---------------------------------------------------------
>
>                 Key: FLINK-8174
>                 URL: https://issues.apache.org/jira/browse/FLINK-8174
>             Project: Flink
>          Issue Type: Bug
>          Components: Mesos
>    Affects Versions: 1.4.0, 1.3.3
>            Reporter: Eron Wright 
>            Assignee: Eron Wright 
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
> resource offer that is based on unreserved resources will fail, because Flink 
> (as of FLINK-7294) erroneously insists that the resource come from a prior 
> reservation.
> Looking at the original issue, the problem may have been misdiagnosed.  
> Ideally Flink should work with both reserved and unreserved resources, but 
> the latter is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to