[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278648#comment-16278648
 ] 

ASF GitHub Bot commented on FLINK-7878:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4911#discussion_r154499917
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 ---
    @@ -183,17 +240,124 @@ public int hashCode() {
                result = 31 * result + directMemoryInMB;
                result = 31 * result + nativeMemoryInMB;
                result = 31 * result + stateSizeInMB;
    +           result = 31 * result + extendedResources.hashCode();
                return result;
        }
     
        @Override
        public String toString() {
    +           String extend = "";
    +           for (Resource resource : extendedResources.values()) {
    +                   extend += ", " + resource.name + "=" + resource.value;
    +           }
                return "ResourceSpec{" +
                                "cpuCores=" + cpuCores +
                                ", heapMemoryInMB=" + heapMemoryInMB +
                                ", directMemoryInMB=" + directMemoryInMB +
                                ", nativeMemoryInMB=" + nativeMemoryInMB +
    -                           ", stateSizeInMB=" + stateSizeInMB +
    +                           ", stateSizeInMB=" + stateSizeInMB + extend +
                                '}';
        }
    +
    +   public static abstract class Resource implements Serializable {
    +           final private String name;
    +
    +           final private Double value;
    +
    +           final private ResourceAggregateType type;
    +
    +           public Resource(String name, double value, 
ResourceAggregateType type) {
    +                   this.name = checkNotNull(name);
    +                   this.value = Double.valueOf(value);
    +                   this.type = checkNotNull(type);
    +           }
    +
    +           Resource merge(Resource other) {
    +                   Preconditions.checkArgument(getClass() == 
other.getClass(), "Merge with different resource type");
    +                   
Preconditions.checkArgument(this.name.equals(other.name), "Merge with different 
resource name");
    +                   
Preconditions.checkArgument(this.type.equals(other.type), "Merge with different 
aggregate type");
    +
    +                   Double value = null;
    +                   switch (type) {
    +                           case AGGREGATE_TYPE_MAX :
    +                                   value = 
other.value.compareTo(this.value) > 0 ? other.value : this.value;
    --- End diff --
    
    `Math.max` should do the trick here.


> Extend the resource type user can define in ResourceSpec
> --------------------------------------------------------
>
>                 Key: FLINK-7878
>                 URL: https://issues.apache.org/jira/browse/FLINK-7878
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataSet API, DataStream API
>            Reporter: shuai.xu
>            Assignee: shuai.xu
>              Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to