[ https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278573#comment-16278573 ]
ASF GitHub Bot commented on FLINK-8174: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5114#discussion_r154873317 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosResourceAllocation.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; + +import static org.apache.flink.mesos.Utils.print; + +/** + * An allocation of resources on a particular host from one or more Mesos offers, to be portioned out to tasks. + * + * <p>This class assumes that the resources were offered <b>without</b> the {@code RESERVATION_REFINEMENT} capability, + * as detailed in the "Resource Format" section of the Mesos protocol definition. + * + * <p>This class is not thread-safe. + */ +public class MesosResourceAllocation { + + protected static final Logger LOG = LoggerFactory.getLogger(MesosResourceAllocation.class); + + static final double EPSILON = 1e-5; + static final String UNRESERVED_ROLE = "*"; + + private final List<Protos.Resource> resources; + + /** + * Creates an allocation of resources for tasks to take. + * + * @param resources the resources to add to the allocation. + */ + public MesosResourceAllocation(Collection<Protos.Resource> resources) { + this.resources = new ArrayList<>(resources); + + // sort the resources to prefer reserved resources + this.resources.sort(Comparator.comparing(r -> UNRESERVED_ROLE.equals(r.getRole()))); + } + + /** + * Gets the remaining resources. + */ + public List<Protos.Resource> getRemaining() { + return Collections.unmodifiableList(resources); + } + + /** + * Takes some amount of scalar resources (e.g. cpus, mem). + * + * @param amount the (approximate) amount to take from the available quantity. + * @param roles the roles to accept + */ + public List<Protos.Resource> takeScalar(String resourceName, double amount, Set<String> roles) { + if (LOG.isDebugEnabled()) { + LOG.debug("Allocating {} {}", amount, resourceName); + } + + List<Protos.Resource> result = new ArrayList<>(1); + for (ListIterator<Protos.Resource> i = resources.listIterator(); i.hasNext();) { + if (amount <= EPSILON) { + break; + } + + // take from next available scalar resource that is unreserved or reserved for an applicable role + Protos.Resource available = i.next(); + if (!resourceName.equals(available.getName()) || !available.hasScalar()) { + continue; + } + if (!UNRESERVED_ROLE.equals(available.getRole()) && !roles.contains(available.getRole())) { + continue; + } + + double amountToTake = Math.min(available.getScalar().getValue(), amount); + Protos.Resource taken = available.toBuilder().setScalar(Protos.Value.Scalar.newBuilder().setValue(amountToTake)).build(); + amount -= amountToTake; + result.add(taken); + if (LOG.isDebugEnabled()) { + LOG.debug("Taking {} from {}", amountToTake, print(available)); + } + + // keep remaining amount (if any) + double remaining = available.getScalar().getValue() - taken.getScalar().getValue(); + if (remaining > EPSILON) { + i.set(available.toBuilder().setScalar(Protos.Value.Scalar.newBuilder().setValue(remaining)).build()); + } + else { + i.remove(); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Allocated: {}, unsatisfied: {}", print(result), amount); + } + return result; + } + + /** + * Takes some amount of range resources (e.g. ports). + * + * @param amount the number of values to take from the available range(s). + * @param roles the roles to accept + */ + public List<Protos.Resource> takeRanges(String resourceName, int amount, Set<String> roles) { + if (LOG.isDebugEnabled()) { + LOG.debug("Allocating {} {}", amount, resourceName); + } + + List<Protos.Resource> result = new ArrayList<>(1); + for (ListIterator<Protos.Resource> i = resources.listIterator(); i.hasNext();) { + if (amount <= 0) { + break; + } + + // take from next available range resource that is unreserved or reserved for an applicable role + Protos.Resource available = i.next(); + if (!resourceName.equals(available.getName()) || !available.hasRanges()) { + continue; + } + if (!UNRESERVED_ROLE.equals(available.getRole()) && !roles.contains(available.getRole())) { + continue; + } + + List<Protos.Value.Range> takenRanges = new ArrayList<>(); + List<Protos.Value.Range> remainingRanges = new ArrayList<>(available.getRanges().getRangeList()); + for (ListIterator<Protos.Value.Range> j = remainingRanges.listIterator(); j.hasNext();) { + if (amount <= 0) { + break; + } + + // take from next available range (note: ranges are inclusive) + Protos.Value.Range availableRange = j.next(); + long amountToTake = Math.min(availableRange.getEnd() - availableRange.getBegin() + 1, amount); + Protos.Value.Range takenRange = availableRange.toBuilder().setEnd(availableRange.getBegin() + amountToTake - 1).build(); + amount -= amountToTake; + takenRanges.add(takenRange); + + // keep remaining range (if any) + long remaining = availableRange.getEnd() - takenRange.getEnd(); + if (remaining > 0) { + j.set(availableRange.toBuilder().setBegin(availableRange.getEnd() - remaining + 1).build()); --- End diff -- I think this expression could be simplified to `takenRange.getEnd() + 1`. > Mesos RM unable to accept offers for unreserved resources > --------------------------------------------------------- > > Key: FLINK-8174 > URL: https://issues.apache.org/jira/browse/FLINK-8174 > Project: Flink > Issue Type: Bug > Components: Mesos > Affects Versions: 1.4.0, 1.3.3 > Reporter: Eron Wright > Assignee: Eron Wright > Priority: Blocker > Fix For: 1.4.0 > > > Flink has suffered a regression due to FLINK-7294. Any attempt to accept a > resource offer that is based on unreserved resources will fail, because Flink > (as of FLINK-7294) erroneously insists that the resource come from a prior > reservation. > Looking at the original issue, the problem may have been misdiagnosed. > Ideally Flink should work with both reserved and unreserved resources, but > the latter is a more common situation that is now broken. -- This message was sent by Atlassian JIRA (v6.4.14#64029)