szetszwo commented on code in PR #8022:
URL: https://github.com/apache/ozone/pull/8022#discussion_r1987767209


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,79 +604,111 @@ private CompletableFuture<Message> writeStateMachineData(
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
-    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
-    // thread.
-    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() -> {
-          try {
-            try {
-              checkContainerHealthy(write.getBlockID().getContainerID(), true);
-            } catch (StorageContainerException e) {
-              return ContainerUtils.logAndReturnError(LOG, e, requestProto);
-            }
-            metrics.recordWriteStateMachineQueueingLatencyNs(
-                Time.monotonicNowNanos() - startTime);
-            return dispatchCommand(requestProto, context);
-          } catch (Exception e) {
-            LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
+    // ensure the write chunk happens asynchronously in writeChunkExecutor 
pool thread.
+    Future<ContainerCommandResponseProto> future = 
getChunkExecutor(requestProto.getWriteChunk()).submit(() -> {
+      try {
+        try {
+          checkContainerHealthy(write.getBlockID().getContainerID(), true);
+        } catch (StorageContainerException e) {
+          ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+          handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+          return result;
+        }
+        metrics.recordWriteStateMachineQueueingLatencyNs(
+            Time.monotonicNowNanos() - startTime);
+        ContainerCommandResponseProto result = dispatchCommand(requestProto, 
context);
+        handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+        return result;
+      } catch (Exception e) {
+        LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
                 "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),
-                entryIndex, write.getChunkData().getChunkName(), e);
-            metrics.incNumWriteDataFails();
-            // write chunks go in parallel. It's possible that one write chunk
-            // see the stateMachine is marked unhealthy by other parallel 
thread
-            unhealthyContainers.add(write.getBlockID().getContainerID());
-            stateMachineHealthy.set(false);
-            raftFuture.completeExceptionally(e);
-            throw e;
-          }
-        }, getChunkExecutor(requestProto.getWriteChunk()));
+            entryIndex, write.getChunkData().getChunkName(), e);
+        metrics.incNumWriteDataFails();
+        // write chunks go in parallel. It's possible that one write chunk
+        // see the stateMachine is marked unhealthy by other parallel thread
+        unhealthyContainers.add(write.getBlockID().getContainerID());
+        stateMachineHealthy.set(false);
+        raftFuture.completeExceptionally(e);
+        throw e;

Review Comment:
   Please do not change indentation.  It makes the review harder.  If the 
existing code formatting is bad, we may fix it first and then fix the bug 
afterward.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -542,6 +573,15 @@ private ContainerCommandResponseProto dispatchCommand(
   private CompletableFuture<Message> writeStateMachineData(
       ContainerCommandRequestProto requestProto, long entryIndex, long term,
       long startTime) {
+    if (writeChunkFutureMap.containsKey(entryIndex)) {
+      // generally state machine will wait forever, for precaution, a check is 
added if retry happens.
+      return writeChunkFutureMap.get(entryIndex).getRaftFuture();
+    }

Review Comment:
   Use get directly to make it atomic.
   ```java
       final WriteFutures previous = writeChunkFutureMap.get(entryIndex);
       if (previous != null) {
         return previous.getRaftFuture();
       }
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,79 +604,111 @@ private CompletableFuture<Message> writeStateMachineData(
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
-    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
-    // thread.
-    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() -> {
-          try {
-            try {
-              checkContainerHealthy(write.getBlockID().getContainerID(), true);
-            } catch (StorageContainerException e) {
-              return ContainerUtils.logAndReturnError(LOG, e, requestProto);
-            }
-            metrics.recordWriteStateMachineQueueingLatencyNs(
-                Time.monotonicNowNanos() - startTime);
-            return dispatchCommand(requestProto, context);
-          } catch (Exception e) {
-            LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
+    // ensure the write chunk happens asynchronously in writeChunkExecutor 
pool thread.
+    Future<ContainerCommandResponseProto> future = 
getChunkExecutor(requestProto.getWriteChunk()).submit(() -> {

Review Comment:
   Why replacing `CompletableFuture.supplyAsync(..)`?



##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java:
##########
@@ -632,7 +632,9 @@ public final class ScmConfigKeys {
 
   public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY =
       "net.topology.node.switch.mapping.impl";
-
+  public static final String 
HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL
+      = "hdds.container.ratis.statemachine.write.wait.interval";
+  public static final long 
HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_DEFAULT = 10 * 60 * 1000L;

Review Comment:
   Is it ms?  Please include the unit in the names.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -187,13 +191,38 @@ long getStartTime() {
     }
   }
 
+  static class WriteFutureContext {
+    private final Future<ContainerCommandResponseProto> writeChunkFuture;
+    private final CompletableFuture<Message> raftFuture;
+    private final long startTime;

Review Comment:
   Please include unit in the name.  We are using ms and ns in different places.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,79 +604,111 @@ private CompletableFuture<Message> writeStateMachineData(
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
-    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
-    // thread.
-    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() -> {
-          try {
-            try {
-              checkContainerHealthy(write.getBlockID().getContainerID(), true);
-            } catch (StorageContainerException e) {
-              return ContainerUtils.logAndReturnError(LOG, e, requestProto);
-            }
-            metrics.recordWriteStateMachineQueueingLatencyNs(
-                Time.monotonicNowNanos() - startTime);
-            return dispatchCommand(requestProto, context);
-          } catch (Exception e) {
-            LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
+    // ensure the write chunk happens asynchronously in writeChunkExecutor 
pool thread.
+    Future<ContainerCommandResponseProto> future = 
getChunkExecutor(requestProto.getWriteChunk()).submit(() -> {
+      try {
+        try {
+          checkContainerHealthy(write.getBlockID().getContainerID(), true);
+        } catch (StorageContainerException e) {
+          ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+          handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+          return result;
+        }
+        metrics.recordWriteStateMachineQueueingLatencyNs(
+            Time.monotonicNowNanos() - startTime);
+        ContainerCommandResponseProto result = dispatchCommand(requestProto, 
context);
+        handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+        return result;
+      } catch (Exception e) {
+        LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
                 "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),
-                entryIndex, write.getChunkData().getChunkName(), e);
-            metrics.incNumWriteDataFails();
-            // write chunks go in parallel. It's possible that one write chunk
-            // see the stateMachine is marked unhealthy by other parallel 
thread
-            unhealthyContainers.add(write.getBlockID().getContainerID());
-            stateMachineHealthy.set(false);
-            raftFuture.completeExceptionally(e);
-            throw e;
-          }
-        }, getChunkExecutor(requestProto.getWriteChunk()));
+            entryIndex, write.getChunkData().getChunkName(), e);
+        metrics.incNumWriteDataFails();
+        // write chunks go in parallel. It's possible that one write chunk
+        // see the stateMachine is marked unhealthy by other parallel thread
+        unhealthyContainers.add(write.getBlockID().getContainerID());
+        stateMachineHealthy.set(false);
+        raftFuture.completeExceptionally(e);
+        throw e;
+      } finally {
+        // Remove the future once it finishes execution from the
+        writeChunkFutureMap.remove(entryIndex);
+      }
+    });
 
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    writeChunkFutureMap.put(entryIndex, new WriteFutureContext(future, 
raftFuture, startTime));
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
               "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
           entryIndex, write.getChunkData().getChunkName());
     }
-    // Remove the future once it finishes execution from the
-    // writeChunkFutureMap.
-    writeChunkFuture.thenApply(r -> {
-      if (r.getResult() != ContainerProtos.Result.SUCCESS
-          && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
-          // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
-          // that should not crash the pipeline.
-          && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
-        StorageContainerException sce =
-            new StorageContainerException(r.getMessage(), r.getResult());
-        LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+    return raftFuture;
+  }
+
+  private void handleCommandResult(ContainerCommandRequestProto requestProto, 
long entryIndex, long startTime,
+                                   ContainerCommandResponseProto r, 
WriteChunkRequestProto write,
+                                   CompletableFuture<Message> raftFuture) {
+    if (r.getResult() != ContainerProtos.Result.SUCCESS
+        && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+        && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+        // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
+        // that should not crash the pipeline.
+        && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+      StorageContainerException sce =
+          new StorageContainerException(r.getMessage(), r.getResult());
+      LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+          write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+          write.getChunkData().getChunkName() + " Error message: " +
+          r.getMessage() + " Container Result: " + r.getResult());
+      metrics.incNumWriteDataFails();
+      // If the write fails currently we mark the stateMachine as unhealthy.
+      // This leads to pipeline close. Any change in that behavior requires
+      // handling the entry for the write chunk in cache.
+      stateMachineHealthy.set(false);
+      unhealthyContainers.add(write.getBlockID().getContainerID());
+      raftFuture.completeExceptionally(sce);
+    } else {
+      metrics.incNumBytesWrittenCount(
+          requestProto.getWriteChunk().getChunkData().getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getGroupId() +
+            ": writeChunk writeStateMachineData  completed: blockId" +
             write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-            write.getChunkData().getChunkName() + " Error message: " +
-            r.getMessage() + " Container Result: " + r.getResult());
-        metrics.incNumWriteDataFails();
-        // If the write fails currently we mark the stateMachine as unhealthy.
-        // This leads to pipeline close. Any change in that behavior requires
-        // handling the entry for the write chunk in cache.
-        stateMachineHealthy.set(false);
-        unhealthyContainers.add(write.getBlockID().getContainerID());
-        raftFuture.completeExceptionally(sce);
-      } else {
-        metrics.incNumBytesWrittenCount(
-            requestProto.getWriteChunk().getChunkData().getLen());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getGroupId() +
-              ": writeChunk writeStateMachineData  completed: blockId" +
-              write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-              write.getChunkData().getChunkName());
-        }
-        raftFuture.complete(r::toByteString);
-        metrics.recordWriteStateMachineCompletionNs(
-            Time.monotonicNowNanos() - startTime);
+            write.getChunkData().getChunkName());
       }
+      raftFuture.complete(r::toByteString);
+      metrics.recordWriteStateMachineCompletionNs(
+          Time.monotonicNowNanos() - startTime);
+    }
+  }
 
-      writeChunkFutureMap.remove(entryIndex);
-      return r;
-    });
-    return raftFuture;
+  private void validateLongRunningWrite() throws IOException {

Review Comment:
   Throw and catch `StorageContainerException` instead.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,79 +604,111 @@ private CompletableFuture<Message> writeStateMachineData(
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
-    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
-    // thread.
-    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() -> {
-          try {
-            try {
-              checkContainerHealthy(write.getBlockID().getContainerID(), true);
-            } catch (StorageContainerException e) {
-              return ContainerUtils.logAndReturnError(LOG, e, requestProto);
-            }
-            metrics.recordWriteStateMachineQueueingLatencyNs(
-                Time.monotonicNowNanos() - startTime);
-            return dispatchCommand(requestProto, context);
-          } catch (Exception e) {
-            LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
+    // ensure the write chunk happens asynchronously in writeChunkExecutor 
pool thread.
+    Future<ContainerCommandResponseProto> future = 
getChunkExecutor(requestProto.getWriteChunk()).submit(() -> {
+      try {
+        try {
+          checkContainerHealthy(write.getBlockID().getContainerID(), true);
+        } catch (StorageContainerException e) {
+          ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+          handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+          return result;
+        }
+        metrics.recordWriteStateMachineQueueingLatencyNs(
+            Time.monotonicNowNanos() - startTime);
+        ContainerCommandResponseProto result = dispatchCommand(requestProto, 
context);
+        handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+        return result;
+      } catch (Exception e) {
+        LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
                 "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),
-                entryIndex, write.getChunkData().getChunkName(), e);
-            metrics.incNumWriteDataFails();
-            // write chunks go in parallel. It's possible that one write chunk
-            // see the stateMachine is marked unhealthy by other parallel 
thread
-            unhealthyContainers.add(write.getBlockID().getContainerID());
-            stateMachineHealthy.set(false);
-            raftFuture.completeExceptionally(e);
-            throw e;
-          }
-        }, getChunkExecutor(requestProto.getWriteChunk()));
+            entryIndex, write.getChunkData().getChunkName(), e);
+        metrics.incNumWriteDataFails();
+        // write chunks go in parallel. It's possible that one write chunk
+        // see the stateMachine is marked unhealthy by other parallel thread
+        unhealthyContainers.add(write.getBlockID().getContainerID());
+        stateMachineHealthy.set(false);
+        raftFuture.completeExceptionally(e);
+        throw e;
+      } finally {
+        // Remove the future once it finishes execution from the
+        writeChunkFutureMap.remove(entryIndex);
+      }
+    });
 
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    writeChunkFutureMap.put(entryIndex, new WriteFutureContext(future, 
raftFuture, startTime));
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
               "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
           entryIndex, write.getChunkData().getChunkName());
     }
-    // Remove the future once it finishes execution from the
-    // writeChunkFutureMap.
-    writeChunkFuture.thenApply(r -> {
-      if (r.getResult() != ContainerProtos.Result.SUCCESS
-          && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
-          // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
-          // that should not crash the pipeline.
-          && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
-        StorageContainerException sce =
-            new StorageContainerException(r.getMessage(), r.getResult());
-        LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+    return raftFuture;
+  }
+
+  private void handleCommandResult(ContainerCommandRequestProto requestProto, 
long entryIndex, long startTime,
+                                   ContainerCommandResponseProto r, 
WriteChunkRequestProto write,
+                                   CompletableFuture<Message> raftFuture) {
+    if (r.getResult() != ContainerProtos.Result.SUCCESS
+        && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+        && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+        // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
+        // that should not crash the pipeline.
+        && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+      StorageContainerException sce =
+          new StorageContainerException(r.getMessage(), r.getResult());
+      LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+          write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+          write.getChunkData().getChunkName() + " Error message: " +
+          r.getMessage() + " Container Result: " + r.getResult());
+      metrics.incNumWriteDataFails();
+      // If the write fails currently we mark the stateMachine as unhealthy.
+      // This leads to pipeline close. Any change in that behavior requires
+      // handling the entry for the write chunk in cache.
+      stateMachineHealthy.set(false);
+      unhealthyContainers.add(write.getBlockID().getContainerID());
+      raftFuture.completeExceptionally(sce);
+    } else {
+      metrics.incNumBytesWrittenCount(
+          requestProto.getWriteChunk().getChunkData().getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getGroupId() +
+            ": writeChunk writeStateMachineData  completed: blockId" +
             write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-            write.getChunkData().getChunkName() + " Error message: " +
-            r.getMessage() + " Container Result: " + r.getResult());
-        metrics.incNumWriteDataFails();
-        // If the write fails currently we mark the stateMachine as unhealthy.
-        // This leads to pipeline close. Any change in that behavior requires
-        // handling the entry for the write chunk in cache.
-        stateMachineHealthy.set(false);
-        unhealthyContainers.add(write.getBlockID().getContainerID());
-        raftFuture.completeExceptionally(sce);
-      } else {
-        metrics.incNumBytesWrittenCount(
-            requestProto.getWriteChunk().getChunkData().getLen());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getGroupId() +
-              ": writeChunk writeStateMachineData  completed: blockId" +
-              write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-              write.getChunkData().getChunkName());
-        }
-        raftFuture.complete(r::toByteString);
-        metrics.recordWriteStateMachineCompletionNs(
-            Time.monotonicNowNanos() - startTime);
+            write.getChunkData().getChunkName());
       }
+      raftFuture.complete(r::toByteString);
+      metrics.recordWriteStateMachineCompletionNs(
+          Time.monotonicNowNanos() - startTime);
+    }
+  }
 
-      writeChunkFutureMap.remove(entryIndex);
-      return r;
-    });
-    return raftFuture;
+  private void validateLongRunningWrite() throws IOException {
+    // get min valid write chunk operation's future context
+    Map.Entry<Long, WriteFutureContext> writeFutureContextEntry = null;
+    while (!writeChunkFutureMap.isEmpty()) {
+      writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+      // there is a possibility of entry being removed before added in map, 
cleanup those
+      if (null == writeFutureContextEntry || 
!writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+        break;
+      }
+      writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
+    }
+    if (null == writeFutureContextEntry) {
+      return;
+    }
+    // validate for timeout in milli second
+    long waitTime = Time.monotonicNow() - 
writeFutureContextEntry.getValue().getStartTime() / 1000000;
+    if (waitTime > writeChunkWaitMaxMs) {
+      LOG.error("Write chunk has taken {}ms crossing threshold {}ms for 
groupId {}", waitTime, writeChunkWaitMaxMs,
+          getGroupId());
+      stateMachineHealthy.set(false);
+      writeChunkFutureMap.forEach((key, value) -> {
+        LOG.error("Cancelling write chunk for transaction {}, groupId {}", 
key, getGroupId());
+        value.getWriteChunkFuture().cancel(true);
+      });
+      throw new StorageContainerException("Write chunk has taken " + waitTime 
+ " crossing threshold "

Review Comment:
   Include log index and time unit in the message.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,79 +604,111 @@ private CompletableFuture<Message> writeStateMachineData(
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
-    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
-    // thread.
-    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() -> {
-          try {
-            try {
-              checkContainerHealthy(write.getBlockID().getContainerID(), true);
-            } catch (StorageContainerException e) {
-              return ContainerUtils.logAndReturnError(LOG, e, requestProto);
-            }
-            metrics.recordWriteStateMachineQueueingLatencyNs(
-                Time.monotonicNowNanos() - startTime);
-            return dispatchCommand(requestProto, context);
-          } catch (Exception e) {
-            LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
+    // ensure the write chunk happens asynchronously in writeChunkExecutor 
pool thread.
+    Future<ContainerCommandResponseProto> future = 
getChunkExecutor(requestProto.getWriteChunk()).submit(() -> {
+      try {
+        try {
+          checkContainerHealthy(write.getBlockID().getContainerID(), true);
+        } catch (StorageContainerException e) {
+          ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+          handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+          return result;
+        }
+        metrics.recordWriteStateMachineQueueingLatencyNs(
+            Time.monotonicNowNanos() - startTime);
+        ContainerCommandResponseProto result = dispatchCommand(requestProto, 
context);
+        handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+        return result;
+      } catch (Exception e) {
+        LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
                 "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),
-                entryIndex, write.getChunkData().getChunkName(), e);
-            metrics.incNumWriteDataFails();
-            // write chunks go in parallel. It's possible that one write chunk
-            // see the stateMachine is marked unhealthy by other parallel 
thread
-            unhealthyContainers.add(write.getBlockID().getContainerID());
-            stateMachineHealthy.set(false);
-            raftFuture.completeExceptionally(e);
-            throw e;
-          }
-        }, getChunkExecutor(requestProto.getWriteChunk()));
+            entryIndex, write.getChunkData().getChunkName(), e);
+        metrics.incNumWriteDataFails();
+        // write chunks go in parallel. It's possible that one write chunk
+        // see the stateMachine is marked unhealthy by other parallel thread
+        unhealthyContainers.add(write.getBlockID().getContainerID());
+        stateMachineHealthy.set(false);
+        raftFuture.completeExceptionally(e);
+        throw e;
+      } finally {
+        // Remove the future once it finishes execution from the
+        writeChunkFutureMap.remove(entryIndex);
+      }
+    });
 
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    writeChunkFutureMap.put(entryIndex, new WriteFutureContext(future, 
raftFuture, startTime));
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
               "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
           entryIndex, write.getChunkData().getChunkName());
     }
-    // Remove the future once it finishes execution from the
-    // writeChunkFutureMap.
-    writeChunkFuture.thenApply(r -> {
-      if (r.getResult() != ContainerProtos.Result.SUCCESS
-          && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
-          // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
-          // that should not crash the pipeline.
-          && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
-        StorageContainerException sce =
-            new StorageContainerException(r.getMessage(), r.getResult());
-        LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+    return raftFuture;
+  }
+
+  private void handleCommandResult(ContainerCommandRequestProto requestProto, 
long entryIndex, long startTime,
+                                   ContainerCommandResponseProto r, 
WriteChunkRequestProto write,
+                                   CompletableFuture<Message> raftFuture) {
+    if (r.getResult() != ContainerProtos.Result.SUCCESS
+        && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+        && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+        // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
+        // that should not crash the pipeline.
+        && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+      StorageContainerException sce =
+          new StorageContainerException(r.getMessage(), r.getResult());
+      LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+          write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+          write.getChunkData().getChunkName() + " Error message: " +
+          r.getMessage() + " Container Result: " + r.getResult());
+      metrics.incNumWriteDataFails();
+      // If the write fails currently we mark the stateMachine as unhealthy.
+      // This leads to pipeline close. Any change in that behavior requires
+      // handling the entry for the write chunk in cache.
+      stateMachineHealthy.set(false);
+      unhealthyContainers.add(write.getBlockID().getContainerID());
+      raftFuture.completeExceptionally(sce);
+    } else {
+      metrics.incNumBytesWrittenCount(
+          requestProto.getWriteChunk().getChunkData().getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getGroupId() +
+            ": writeChunk writeStateMachineData  completed: blockId" +
             write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-            write.getChunkData().getChunkName() + " Error message: " +
-            r.getMessage() + " Container Result: " + r.getResult());
-        metrics.incNumWriteDataFails();
-        // If the write fails currently we mark the stateMachine as unhealthy.
-        // This leads to pipeline close. Any change in that behavior requires
-        // handling the entry for the write chunk in cache.
-        stateMachineHealthy.set(false);
-        unhealthyContainers.add(write.getBlockID().getContainerID());
-        raftFuture.completeExceptionally(sce);
-      } else {
-        metrics.incNumBytesWrittenCount(
-            requestProto.getWriteChunk().getChunkData().getLen());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getGroupId() +
-              ": writeChunk writeStateMachineData  completed: blockId" +
-              write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-              write.getChunkData().getChunkName());
-        }
-        raftFuture.complete(r::toByteString);
-        metrics.recordWriteStateMachineCompletionNs(
-            Time.monotonicNowNanos() - startTime);
+            write.getChunkData().getChunkName());
       }
+      raftFuture.complete(r::toByteString);
+      metrics.recordWriteStateMachineCompletionNs(
+          Time.monotonicNowNanos() - startTime);
+    }
+  }
 
-      writeChunkFutureMap.remove(entryIndex);
-      return r;
-    });
-    return raftFuture;
+  private void validateLongRunningWrite() throws IOException {
+    // get min valid write chunk operation's future context
+    Map.Entry<Long, WriteFutureContext> writeFutureContextEntry = null;
+    while (!writeChunkFutureMap.isEmpty()) {
+      writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+      // there is a possibility of entry being removed before added in map, 
cleanup those
+      if (null == writeFutureContextEntry || 
!writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+        break;
+      }
+      writeChunkFutureMap.remove(writeFutureContextEntry.getKey());

Review Comment:
   Do not call `remove` here and the it be removed in the usual way.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -187,13 +191,38 @@ long getStartTime() {
     }
   }
 
+  static class WriteFutureContext {
+    private final Future<ContainerCommandResponseProto> writeChunkFuture;
+    private final CompletableFuture<Message> raftFuture;
+    private final long startTime;
+
+    WriteFutureContext(Future<ContainerCommandResponseProto> writeChunkFuture,
+                       CompletableFuture<Message> raftFuture, long startTime) {
+      this.writeChunkFuture = writeChunkFuture;
+      this.raftFuture = raftFuture;
+      this.startTime = startTime;
+    }
+
+    public Future<ContainerCommandResponseProto> getWriteChunkFuture() {
+      return writeChunkFuture;
+    }
+
+    public CompletableFuture<Message> getRaftFuture() {
+      return raftFuture;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+  }
+
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
   private final ContainerController containerController;
   private final XceiverServerRatis ratisServer;
-  private final ConcurrentHashMap<Long,
-      CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
+  private final NavigableMap<Long, WriteFutureContext> writeChunkFutureMap;
+  private final long writeChunkWaitMaxMs;

Review Comment:
   Let's use ns.  Then, we can save a division per entry.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -187,13 +191,38 @@ long getStartTime() {
     }
   }
 
+  static class WriteFutureContext {
+    private final Future<ContainerCommandResponseProto> writeChunkFuture;
+    private final CompletableFuture<Message> raftFuture;
+    private final long startTime;
+
+    WriteFutureContext(Future<ContainerCommandResponseProto> writeChunkFuture,
+                       CompletableFuture<Message> raftFuture, long startTime) {
+      this.writeChunkFuture = writeChunkFuture;
+      this.raftFuture = raftFuture;
+      this.startTime = startTime;
+    }
+
+    public Future<ContainerCommandResponseProto> getWriteChunkFuture() {
+      return writeChunkFuture;
+    }
+
+    public CompletableFuture<Message> getRaftFuture() {
+      return raftFuture;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+  }
+
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
   private final ContainerController containerController;
   private final XceiverServerRatis ratisServer;
-  private final ConcurrentHashMap<Long,
-      CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
+  private final NavigableMap<Long, WriteFutureContext> writeChunkFutureMap;

Review Comment:
   It is a good idea to use `NavigableMap` 👍



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,79 +604,111 @@ private CompletableFuture<Message> writeStateMachineData(
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
-    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
-    // thread.
-    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() -> {
-          try {
-            try {
-              checkContainerHealthy(write.getBlockID().getContainerID(), true);
-            } catch (StorageContainerException e) {
-              return ContainerUtils.logAndReturnError(LOG, e, requestProto);
-            }
-            metrics.recordWriteStateMachineQueueingLatencyNs(
-                Time.monotonicNowNanos() - startTime);
-            return dispatchCommand(requestProto, context);
-          } catch (Exception e) {
-            LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
+    // ensure the write chunk happens asynchronously in writeChunkExecutor 
pool thread.
+    Future<ContainerCommandResponseProto> future = 
getChunkExecutor(requestProto.getWriteChunk()).submit(() -> {
+      try {
+        try {
+          checkContainerHealthy(write.getBlockID().getContainerID(), true);
+        } catch (StorageContainerException e) {
+          ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+          handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+          return result;
+        }
+        metrics.recordWriteStateMachineQueueingLatencyNs(
+            Time.monotonicNowNanos() - startTime);
+        ContainerCommandResponseProto result = dispatchCommand(requestProto, 
context);
+        handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+        return result;
+      } catch (Exception e) {
+        LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
                 "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),
-                entryIndex, write.getChunkData().getChunkName(), e);
-            metrics.incNumWriteDataFails();
-            // write chunks go in parallel. It's possible that one write chunk
-            // see the stateMachine is marked unhealthy by other parallel 
thread
-            unhealthyContainers.add(write.getBlockID().getContainerID());
-            stateMachineHealthy.set(false);
-            raftFuture.completeExceptionally(e);
-            throw e;
-          }
-        }, getChunkExecutor(requestProto.getWriteChunk()));
+            entryIndex, write.getChunkData().getChunkName(), e);
+        metrics.incNumWriteDataFails();
+        // write chunks go in parallel. It's possible that one write chunk
+        // see the stateMachine is marked unhealthy by other parallel thread
+        unhealthyContainers.add(write.getBlockID().getContainerID());
+        stateMachineHealthy.set(false);
+        raftFuture.completeExceptionally(e);
+        throw e;
+      } finally {
+        // Remove the future once it finishes execution from the
+        writeChunkFutureMap.remove(entryIndex);
+      }
+    });
 
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    writeChunkFutureMap.put(entryIndex, new WriteFutureContext(future, 
raftFuture, startTime));
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
               "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
           entryIndex, write.getChunkData().getChunkName());
     }
-    // Remove the future once it finishes execution from the
-    // writeChunkFutureMap.
-    writeChunkFuture.thenApply(r -> {
-      if (r.getResult() != ContainerProtos.Result.SUCCESS
-          && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
-          // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
-          // that should not crash the pipeline.
-          && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
-        StorageContainerException sce =
-            new StorageContainerException(r.getMessage(), r.getResult());
-        LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+    return raftFuture;
+  }
+
+  private void handleCommandResult(ContainerCommandRequestProto requestProto, 
long entryIndex, long startTime,
+                                   ContainerCommandResponseProto r, 
WriteChunkRequestProto write,
+                                   CompletableFuture<Message> raftFuture) {
+    if (r.getResult() != ContainerProtos.Result.SUCCESS
+        && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+        && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+        // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
+        // that should not crash the pipeline.
+        && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+      StorageContainerException sce =
+          new StorageContainerException(r.getMessage(), r.getResult());
+      LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+          write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+          write.getChunkData().getChunkName() + " Error message: " +
+          r.getMessage() + " Container Result: " + r.getResult());
+      metrics.incNumWriteDataFails();
+      // If the write fails currently we mark the stateMachine as unhealthy.
+      // This leads to pipeline close. Any change in that behavior requires
+      // handling the entry for the write chunk in cache.
+      stateMachineHealthy.set(false);
+      unhealthyContainers.add(write.getBlockID().getContainerID());
+      raftFuture.completeExceptionally(sce);
+    } else {
+      metrics.incNumBytesWrittenCount(
+          requestProto.getWriteChunk().getChunkData().getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getGroupId() +
+            ": writeChunk writeStateMachineData  completed: blockId" +
             write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-            write.getChunkData().getChunkName() + " Error message: " +
-            r.getMessage() + " Container Result: " + r.getResult());
-        metrics.incNumWriteDataFails();
-        // If the write fails currently we mark the stateMachine as unhealthy.
-        // This leads to pipeline close. Any change in that behavior requires
-        // handling the entry for the write chunk in cache.
-        stateMachineHealthy.set(false);
-        unhealthyContainers.add(write.getBlockID().getContainerID());
-        raftFuture.completeExceptionally(sce);
-      } else {
-        metrics.incNumBytesWrittenCount(
-            requestProto.getWriteChunk().getChunkData().getLen());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getGroupId() +
-              ": writeChunk writeStateMachineData  completed: blockId" +
-              write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-              write.getChunkData().getChunkName());
-        }
-        raftFuture.complete(r::toByteString);
-        metrics.recordWriteStateMachineCompletionNs(
-            Time.monotonicNowNanos() - startTime);
+            write.getChunkData().getChunkName());
       }
+      raftFuture.complete(r::toByteString);
+      metrics.recordWriteStateMachineCompletionNs(
+          Time.monotonicNowNanos() - startTime);
+    }
+  }
 
-      writeChunkFutureMap.remove(entryIndex);
-      return r;
-    });
-    return raftFuture;
+  private void validateLongRunningWrite() throws IOException {
+    // get min valid write chunk operation's future context
+    Map.Entry<Long, WriteFutureContext> writeFutureContextEntry = null;
+    while (!writeChunkFutureMap.isEmpty()) {
+      writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+      // there is a possibility of entry being removed before added in map, 
cleanup those
+      if (null == writeFutureContextEntry || 
!writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+        break;
+      }
+      writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
+    }
+    if (null == writeFutureContextEntry) {
+      return;
+    }
+    // validate for timeout in milli second
+    long waitTime = Time.monotonicNow() - 
writeFutureContextEntry.getValue().getStartTime() / 1000000;

Review Comment:
   Bug: it must be minus and then divide, not the other way around . 
   ```java
   final long diff = Time.monotonicNow() - 
writeFutureContextEntry.getValue().getStartTime();
   final long waitTime = diff / 1000000;
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -821,7 +893,7 @@ private ByteString readStateMachineData(
   public CompletableFuture<Void> flush(long index) {
     return CompletableFuture.allOf(
         writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= 
index)
-            .map(Map.Entry::getValue).toArray(CompletableFuture[]::new));
+            .map(e -> 
e.getValue().getRaftFuture()).toArray(CompletableFuture[]::new));

Review Comment:
   Since it is a `NavigableMap`, we should use `headMap`.
   ```java
     public CompletableFuture<Void> flush(long index) {
       final SortedMap<Long, WriteFutureContext> head = 
writeChunkFutureMap.headMap(index + 1);
       if (head.isEmpty()) {
         return CompletableFuture.completedFuture(null);
       }
       return CompletableFuture.allOf(head.values().stream()
           .map(WriteFutureContext::getRaftFuture)
           .toArray(CompletableFuture[]::new));
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to