[ 
https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278575#comment-16278575
 ] 

ASF GitHub Bot commented on FLINK-8174:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5114#discussion_r154870143
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/Offer.java ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.scheduler;
    +
    +import com.netflix.fenzo.VirtualMachineLease;
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +
    +import static org.apache.flink.mesos.Utils.print;
    +
    +/**
    + * An adapter class to transform a Mesos resource offer to a Fenzo {@link 
VirtualMachineLease}.
    + *
    + * <p>The default implementation provided by Fenzo isn't compatible with 
reserved resources.
    + * This implementation properly combines resources, e.g. a combination of 
reserved and unreserved cpus.
    + *
    + */
    +public class Offer implements VirtualMachineLease {
    +
    +   private static final Logger logger = 
LoggerFactory.getLogger(Offer.class);
    +
    +   private final Protos.Offer offer;
    +   private final String hostname;
    +   private final String vmID;
    +   private final long offeredTime;
    +
    +   private final List<Protos.Resource> resources;
    +   private final Map<String, List<Protos.Resource>> resourceMap;
    +   private final Map<String, Protos.Attribute> attributeMap;
    +
    +   public Offer(Protos.Offer offer) {
    +           this.offer = offer;
    +           this.hostname = offer.getHostname();
    +           this.vmID = offer.getSlaveId().getValue();
    +           this.offeredTime = System.currentTimeMillis();
    +
    +           this.resources = new 
ArrayList<>(offer.getResourcesList().size());
    +           this.resourceMap = new HashMap<>();
    +           for (Protos.Resource resource : offer.getResourcesList()) {
    +                   switch (resource.getType()) {
    +                           case SCALAR:
    +                           case RANGES:
    +                                   resources.add(resource);
    +                                   
resourceMap.computeIfAbsent(resource.getName(), k -> new 
ArrayList<>(2)).add(resource);
    +                                   break;
    +                           default:
    +                                   logger.debug("Unknown resource type " + 
resource.getType() + " for resource " + resource.getName() +
    +                                           " in offer, hostname=" + 
hostname + ", offerId=" + offer.getId());
    +                   }
    +           }
    +
    +           if (offer.getAttributesCount() > 0) {
    +                   Map<String, Protos.Attribute> attributeMap = new 
HashMap<>();
    +                   for (Protos.Attribute attribute: 
offer.getAttributesList()) {
    +                           attributeMap.put(attribute.getName(), 
attribute);
    +                   }
    +                   this.attributeMap = 
Collections.unmodifiableMap(attributeMap);
    +           } else {
    +                   this.attributeMap = Collections.emptyMap();
    +           }
    +   }
    +
    +   public List<Protos.Resource> getResources() {
    +           return Collections.unmodifiableList(resources);
    --- End diff --
    
    Can we make `this.resources` an unmodifiable list instead?


> Mesos RM unable to accept offers for unreserved resources
> ---------------------------------------------------------
>
>                 Key: FLINK-8174
>                 URL: https://issues.apache.org/jira/browse/FLINK-8174
>             Project: Flink
>          Issue Type: Bug
>          Components: Mesos
>    Affects Versions: 1.4.0, 1.3.3
>            Reporter: Eron Wright 
>            Assignee: Eron Wright 
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
> resource offer that is based on unreserved resources will fail, because Flink 
> (as of FLINK-7294) erroneously insists that the resource come from a prior 
> reservation.
> Looking at the original issue, the problem may have been misdiagnosed.  
> Ideally Flink should work with both reserved and unreserved resources, but 
> the latter is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to