swamirishi commented on code in PR #8022:
URL: https://github.com/apache/ozone/pull/8022#discussion_r1985243484


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -186,13 +186,39 @@ long getStartTime() {
     }
   }
 
+  static class WriteFutureContext {
+    private final CompletableFuture<ContainerCommandResponseProto> 
writeChunkFuture;
+    private final CompletableFuture<Message> raftFuture;
+    private final long startTime;
+
+    WriteFutureContext(CompletableFuture<ContainerCommandResponseProto> 
writeChunkFuture,
+                       CompletableFuture<Message> raftFuture, long startTime) {
+      this.writeChunkFuture = writeChunkFuture;
+      this.raftFuture = raftFuture;
+      this.startTime = startTime;
+    }
+
+    public CompletableFuture<ContainerCommandResponseProto> 
getWriteChunkFuture() {
+      return writeChunkFuture;
+    }
+
+    public CompletableFuture<Message> getRaftFuture() {
+      return raftFuture;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+  }
+
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
   private final ContainerController containerController;
   private final XceiverServerRatis ratisServer;
-  private final ConcurrentHashMap<Long,
-      CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
+  private final ConcurrentHashMap<Long, WriteFutureContext> 
writeChunkFutureMap;

Review Comment:
   We can make this concurrentSkipListMap instead and remove the 
writeFutureMinIndex. Keys would be in sorted order. 



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -618,6 +655,37 @@ private CompletableFuture<Message> writeStateMachineData(
     return raftFuture;
   }
 
+  private void validateLongRunningWrite(long currIndex) throws IOException {
+    // get min valid write chunk operation's future context
+    WriteFutureContext writeFutureContext = null;
+    for (long i = writeFutureMinIndex; i < currIndex; ++i) {
+      if (writeChunkFutureMap.containsKey(i)) {
+        writeFutureContext = writeChunkFutureMap.get(i);
+        writeFutureMinIndex = i;
+        break;
+      }
+    }
+    if (null == writeFutureContext) {
+      return;
+    }
+    // validate for timeout in milli second
+    long waitTime = Time.monotonicNow() - writeFutureContext.getStartTime() / 
1000000;
+    IOException ex = new StorageContainerException("Write chunk has taken " + 
waitTime + " crossing threshold "
+        + writeChunkWaitMaxMs + " for groupId " + getGroupId(), 
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+    if (waitTime > writeChunkWaitMaxMs) {
+      LOG.error("Write chunk has taken {}ms crossing threshold {}ms for 
groupId {}", waitTime, writeChunkWaitMaxMs,
+          getGroupId());
+      stateMachineHealthy.set(false);
+      writeChunkFutureMap.forEach((key, value) -> {
+        LOG.error("Cancelling write chunk for transaction {}, groupId {}", 
key, getGroupId());
+        value.getWriteChunkFuture().cancel(true);
+        value.getRaftFuture().completeExceptionally(ex);

Review Comment:
   Just cancelling is enough. Interrupted exception will be caught and that 
flow will take care of the rest



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -186,13 +186,39 @@ long getStartTime() {
     }
   }
 
+  static class WriteFutureContext {
+    private final CompletableFuture<ContainerCommandResponseProto> 
writeChunkFuture;
+    private final CompletableFuture<Message> raftFuture;
+    private final long startTime;
+
+    WriteFutureContext(CompletableFuture<ContainerCommandResponseProto> 
writeChunkFuture,
+                       CompletableFuture<Message> raftFuture, long startTime) {
+      this.writeChunkFuture = writeChunkFuture;
+      this.raftFuture = raftFuture;
+      this.startTime = startTime;
+    }
+
+    public CompletableFuture<ContainerCommandResponseProto> 
getWriteChunkFuture() {
+      return writeChunkFuture;
+    }
+
+    public CompletableFuture<Message> getRaftFuture() {
+      return raftFuture;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+  }
+
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
   private final ContainerController containerController;
   private final XceiverServerRatis ratisServer;
-  private final ConcurrentHashMap<Long,
-      CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
+  private final ConcurrentHashMap<Long, WriteFutureContext> 
writeChunkFutureMap;

Review Comment:
   We can just keep this variable as a SortedMap(Base interface), we need not 
keep it implementation specific



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -618,6 +655,37 @@ private CompletableFuture<Message> writeStateMachineData(
     return raftFuture;
   }
 
+  private void validateLongRunningWrite(long currIndex) throws IOException {
+    // get min valid write chunk operation's future context
+    WriteFutureContext writeFutureContext = null;
+    for (long i = writeFutureMinIndex; i < currIndex; ++i) {
+      if (writeChunkFutureMap.containsKey(i)) {
+        writeFutureContext = writeChunkFutureMap.get(i);
+        writeFutureMinIndex = i;
+        break;
+      }
+    }
+    if (null == writeFutureContext) {
+      return;
+    }
+    // validate for timeout in milli second
+    long waitTime = Time.monotonicNow() - writeFutureContext.getStartTime() / 
1000000;
+    IOException ex = new StorageContainerException("Write chunk has taken " + 
waitTime + " crossing threshold "
+        + writeChunkWaitMaxMs + " for groupId " + getGroupId(), 
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+    if (waitTime > writeChunkWaitMaxMs) {
+      LOG.error("Write chunk has taken {}ms crossing threshold {}ms for 
groupId {}", waitTime, writeChunkWaitMaxMs,
+          getGroupId());
+      stateMachineHealthy.set(false);
+      writeChunkFutureMap.forEach((key, value) -> {
+        LOG.error("Cancelling write chunk for transaction {}, groupId {}", 
key, getGroupId());
+        value.getWriteChunkFuture().cancel(true);
+        value.getRaftFuture().completeExceptionally(ex);
+      });
+      writeFutureContext.getRaftFuture().completeExceptionally(ex);

Review Comment:
   We don't have to do this. The exception handling flow will automatically 
finish exceptionally



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -618,6 +655,37 @@ private CompletableFuture<Message> writeStateMachineData(
     return raftFuture;
   }
 
+  private void validateLongRunningWrite(long currIndex) throws IOException {
+    // get min valid write chunk operation's future context
+    WriteFutureContext writeFutureContext = null;
+    for (long i = writeFutureMinIndex; i < currIndex; ++i) {
+      if (writeChunkFutureMap.containsKey(i)) {
+        writeFutureContext = writeChunkFutureMap.get(i);
+        writeFutureMinIndex = i;
+        break;
+      }
+    }
+    if (null == writeFutureContext) {
+      return;
+    }
+    // validate for timeout in milli second
+    long waitTime = Time.monotonicNow() - writeFutureContext.getStartTime() / 
1000000;
+    IOException ex = new StorageContainerException("Write chunk has taken " + 
waitTime + " crossing threshold "
+        + writeChunkWaitMaxMs + " for groupId " + getGroupId(), 
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+    if (waitTime > writeChunkWaitMaxMs) {
+      LOG.error("Write chunk has taken {}ms crossing threshold {}ms for 
groupId {}", waitTime, writeChunkWaitMaxMs,
+          getGroupId());
+      stateMachineHealthy.set(false);
+      writeChunkFutureMap.forEach((key, value) -> {
+        LOG.error("Cancelling write chunk for transaction {}, groupId {}", 
key, getGroupId());
+        value.getWriteChunkFuture().cancel(true);
+        value.getRaftFuture().completeExceptionally(ex);
+      });
+      writeFutureContext.getRaftFuture().completeExceptionally(ex);

Review Comment:
   No need to throw exceptions either. WriteStateMachineData will fail after 
https://github.com/apache/ozone/pull/7862



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -618,6 +655,37 @@ private CompletableFuture<Message> writeStateMachineData(
     return raftFuture;
   }
 
+  private void validateLongRunningWrite(long currIndex) throws IOException {
+    // get min valid write chunk operation's future context
+    WriteFutureContext writeFutureContext = null;
+    for (long i = writeFutureMinIndex; i < currIndex; ++i) {

Review Comment:
   We can just do firstKey() after making this a sorted map. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to