[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278657#comment-16278657
 ] 

ASF GitHub Bot commented on FLINK-7878:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154965613
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
    @@ -183,17 +240,124 @@ public int hashCode() {
                result = 31 * result + directMemoryInMB;
                result = 31 * result + nativeMemoryInMB;
                result = 31 * result + stateSizeInMB;
    +           result = 31 * result + extendedResources.hashCode();
                return result;
        }
     
        @Override
        public String toString() {
    +           String extend = "";
    +           for (Resource resource : extendedResources.values()) {
    +                   extend += ", " + resource.name + "=" + resource.value;
    +           }
                return "ResourceSpec{" +
                                "cpuCores=" + cpuCores +
                                ", heapMemoryInMB=" + heapMemoryInMB +
                                ", directMemoryInMB=" + directMemoryInMB +
                                ", nativeMemoryInMB=" + nativeMemoryInMB +
    -                           ", stateSizeInMB=" + stateSizeInMB +
    +                           ", stateSizeInMB=" + stateSizeInMB + extend +
                                '}';
        }
    +
    +   public static abstract class Resource implements Serializable {
    +           final private String name;
    +
    +           final private Double value;
    +
    +           final private ResourceAggregateType type;
    +
    +           public Resource(String name, double value, 
ResourceAggregateType type) {
    +                   this.name = checkNotNull(name);
    +                   this.value = Double.valueOf(value);
    +                   this.type = checkNotNull(type);
    +           }
    +
    +           Resource merge(Resource other) {
    +                   Preconditions.checkArgument(getClass() == 
other.getClass(), "Merge with different resource type");
    +                   
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different 
resource name");
    +                   
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different 
aggregate type");
    +
    +                   Double value = null;
    +                   switch (type) {
    +                           case AGGREGATE_TYPE_MAX :
    +                                   value = 
other.value.compareTo(this.value) > 0 ? other.value : this.value;
    +                                   break;
    +
    +                           case AGGREGATE_TYPE_SUM:
    +                           default:
    +                                   value = this.value + other.value;
    +                   }
    +
    +                   Resource resource = create(value, type);
    +                   return resource;
    +           }
    +
    +           @Override
    +           public boolean equals(Object o) {
    +                   if (this == o) {
    +                           return true;
    +                   } else if (o != null && getClass() == o.getClass()) {
    +                           Resource other = (Resource) o;
    +
    +                           return name.equals(other.name) && 
type.equals(other.type) && value.equals(other.value);
    +                   } else {
    +                           return false;
    +                   }
    +           }
    +
    +           @Override
    +           public int hashCode() {
    +                   int result = name != null ? name.hashCode() : 0;
    +                   result = 31 * result + type.ordinal();
    +                   result = 31 * result + value.hashCode();
    +                   return result;
    +           }
    +
    +           /**
    +            * create a resource of the same resource type
    +            *
    +            * @param value the value of the resource
    +            * @param type the aggregate type of the resource
    +            * @return a new instance of the sub resource
    +            */
    +           protected abstract Resource create(double value, 
ResourceAggregateType type);
    +   }
    +
    +   /**
    +    * The GPU resource.
    +    */
    +   public static class GPUResource extends Resource {
    +
    +           public GPUResource(double value) {
    +                   this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM);
    +           }
    +
    +           public GPUResource(double value, ResourceAggregateType type) {
    +                   super("GPU", value, type);
    +           }
    +
    +           @Override
    +           public Resource create(double value, ResourceAggregateType 
type) {
    +                   return new GPUResource(value, type);
    +           }
    +   }
    +
    +   /**
    +    * The FPGA resource.
    +    */
    +   public static class FPGAResource extends Resource {
    --- End diff --
    
    I think this resource is too specific for Flink right now. Therefore I 
would remove it and only keep the `GPUResource`.


> Extend the resource type user can define in ResourceSpec
> --------------------------------------------------------
>
>                 Key: FLINK-7878
>                 URL: https://issues.apache.org/jira/browse/FLINK-7878
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataSet API, DataStream API
>            Reporter: shuai.xu
>            Assignee: shuai.xu
>              Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to