[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278657#comment-16278657 ]
ASF GitHub Bot commented on FLINK-7878: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4911#discussion_r154965613 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java --- @@ -183,17 +240,124 @@ public int hashCode() { result = 31 * result + directMemoryInMB; result = 31 * result + nativeMemoryInMB; result = 31 * result + stateSizeInMB; + result = 31 * result + extendedResources.hashCode(); return result; } @Override public String toString() { + String extend = ""; + for (Resource resource : extendedResources.values()) { + extend += ", " + resource.name + "=" + resource.value; + } return "ResourceSpec{" + "cpuCores=" + cpuCores + ", heapMemoryInMB=" + heapMemoryInMB + ", directMemoryInMB=" + directMemoryInMB + ", nativeMemoryInMB=" + nativeMemoryInMB + - ", stateSizeInMB=" + stateSizeInMB + + ", stateSizeInMB=" + stateSizeInMB + extend + '}'; } + + public static abstract class Resource implements Serializable { + final private String name; + + final private Double value; + + final private ResourceAggregateType type; + + public Resource(String name, double value, ResourceAggregateType type) { + this.name = checkNotNull(name); + this.value = Double.valueOf(value); + this.type = checkNotNull(type); + } + + Resource merge(Resource other) { + Preconditions.checkArgument(getClass() == other.getClass(), "Merge with different resource type"); + Preconditions.checkArgument(this.name.equals(other.name), "Merge with different resource name"); + Preconditions.checkArgument(this.type.equals(other.type), "Merge with different aggregate type"); + + Double value = null; + switch (type) { + case AGGREGATE_TYPE_MAX : + value = other.value.compareTo(this.value) > 0 ? other.value : this.value; + break; + + case AGGREGATE_TYPE_SUM: + default: + value = this.value + other.value; + } + + Resource resource = create(value, type); + return resource; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o != null && getClass() == o.getClass()) { + Resource other = (Resource) o; + + return name.equals(other.name) && type.equals(other.type) && value.equals(other.value); + } else { + return false; + } + } + + @Override + public int hashCode() { + int result = name != null ? name.hashCode() : 0; + result = 31 * result + type.ordinal(); + result = 31 * result + value.hashCode(); + return result; + } + + /** + * create a resource of the same resource type + * + * @param value the value of the resource + * @param type the aggregate type of the resource + * @return a new instance of the sub resource + */ + protected abstract Resource create(double value, ResourceAggregateType type); + } + + /** + * The GPU resource. + */ + public static class GPUResource extends Resource { + + public GPUResource(double value) { + this(value, ResourceAggregateType.AGGREGATE_TYPE_SUM); + } + + public GPUResource(double value, ResourceAggregateType type) { + super("GPU", value, type); + } + + @Override + public Resource create(double value, ResourceAggregateType type) { + return new GPUResource(value, type); + } + } + + /** + * The FPGA resource. + */ + public static class FPGAResource extends Resource { --- End diff -- I think this resource is too specific for Flink right now. Therefore I would remove it and only keep the `GPUResource`. > Extend the resource type user can define in ResourceSpec > -------------------------------------------------------- > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API > Reporter: shuai.xu > Assignee: shuai.xu > Labels: flip-6 > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)