KarmaGYZ commented on a change in pull request #11873:
URL: https://github.com/apache/flink/pull/11873#discussion_r471210685



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAttemptID.java
##########
@@ -18,33 +18,77 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.flink.util.AbstractID;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Unique identifier for the attempt to execute a tasks. Multiple attempts 
happen
  * in cases of failures and recovery.
  */
-public class ExecutionAttemptID extends AbstractID {
+public class ExecutionAttemptID implements java.io.Serializable {
 
        private static final long serialVersionUID = -1169683445778281344L;
 
+       private final ExecutionVertexID executionVertexID;
+       private final int attemptNumber;
+
+       /**
+        * Get a random execution attempt id.
+        */
        public ExecutionAttemptID() {
+               this(new ExecutionVertexID(), 0);
        }
 
-       public ExecutionAttemptID(long lowerPart, long upperPart) {
-               super(lowerPart, upperPart);
+       public ExecutionAttemptID(ExecutionVertexID executionVertexID, int 
attemptNumber) {
+               Preconditions.checkState(attemptNumber >= 0);
+               this.executionVertexID = 
Preconditions.checkNotNull(executionVertexID);
+               this.attemptNumber = attemptNumber;
        }
 
        public void writeTo(ByteBuf buf) {
-               buf.writeLong(this.lowerPart);
-               buf.writeLong(this.upperPart);
+               executionVertexID.writeTo(buf);
+               buf.writeInt(this.attemptNumber);
        }
 
        public static ExecutionAttemptID fromByteBuf(ByteBuf buf) {
-               long lower = buf.readLong();
-               long upper = buf.readLong();
-               return new ExecutionAttemptID(lower, upper);
+               final ExecutionVertexID executionVertexID = 
ExecutionVertexID.fromByteBuf(buf);
+               final int attemptNumber = buf.readInt();
+               return new ExecutionAttemptID(executionVertexID, attemptNumber);
+       }
+
+       @VisibleForTesting
+       public int getAttemptNumber() {
+               return attemptNumber;
+       }
+
+       @VisibleForTesting
+       public ExecutionVertexID getExecutionVertexID() {
+               return executionVertexID;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj != null && obj.getClass() == getClass()) {
+                       ExecutionAttemptID that = (ExecutionAttemptID) obj;
+                       return 
that.executionVertexID.equals(this.executionVertexID)
+                               && that.attemptNumber == this.attemptNumber;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return this.executionVertexID.hashCode() ^ this.attemptNumber;

Review comment:
       Good point!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to