szetszwo commented on code in PR #8022:
URL: https://github.com/apache/ozone/pull/8022#discussion_r1989741176


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -588,55 +633,91 @@ private CompletableFuture<Message> writeStateMachineData(
             stateMachineHealthy.set(false);
             raftFuture.completeExceptionally(e);
             throw e;
+          } finally {
+            // Remove the future once it finishes execution from the
+            writeChunkFutureMap.remove(entryIndex);
           }
-        }, getChunkExecutor(requestProto.getWriteChunk()));
+        });
 
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, 
startTime));
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
               "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
           entryIndex, write.getChunkData().getChunkName());
     }
-    // Remove the future once it finishes execution from the
-    // writeChunkFutureMap.
-    writeChunkFuture.thenApply(r -> {
-      if (r.getResult() != ContainerProtos.Result.SUCCESS
-          && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
-          // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
-          // that should not crash the pipeline.
-          && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
-        StorageContainerException sce =
-            new StorageContainerException(r.getMessage(), r.getResult());
-        LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+    return raftFuture;
+  }
+
+  private void handleCommandResult(ContainerCommandRequestProto requestProto, 
long entryIndex, long startTime,
+                                   ContainerCommandResponseProto r, 
WriteChunkRequestProto write,
+                                   CompletableFuture<Message> raftFuture) {
+    if (r.getResult() != ContainerProtos.Result.SUCCESS
+        && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+        && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+        // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
+        // that should not crash the pipeline.
+        && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+      StorageContainerException sce =
+          new StorageContainerException(r.getMessage(), r.getResult());
+      LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+          write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+          write.getChunkData().getChunkName() + " Error message: " +
+          r.getMessage() + " Container Result: " + r.getResult());
+      metrics.incNumWriteDataFails();
+      // If the write fails currently we mark the stateMachine as unhealthy.
+      // This leads to pipeline close. Any change in that behavior requires
+      // handling the entry for the write chunk in cache.
+      stateMachineHealthy.set(false);
+      unhealthyContainers.add(write.getBlockID().getContainerID());
+      raftFuture.completeExceptionally(sce);
+    } else {
+      metrics.incNumBytesWrittenCount(
+          requestProto.getWriteChunk().getChunkData().getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getGroupId() +
+            ": writeChunk writeStateMachineData  completed: blockId" +
             write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-            write.getChunkData().getChunkName() + " Error message: " +
-            r.getMessage() + " Container Result: " + r.getResult());
-        metrics.incNumWriteDataFails();
-        // If the write fails currently we mark the stateMachine as unhealthy.
-        // This leads to pipeline close. Any change in that behavior requires
-        // handling the entry for the write chunk in cache.
-        stateMachineHealthy.set(false);
-        unhealthyContainers.add(write.getBlockID().getContainerID());
-        raftFuture.completeExceptionally(sce);
-      } else {
-        metrics.incNumBytesWrittenCount(
-            requestProto.getWriteChunk().getChunkData().getLen());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getGroupId() +
-              ": writeChunk writeStateMachineData  completed: blockId" +
-              write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-              write.getChunkData().getChunkName());
-        }
-        raftFuture.complete(r::toByteString);
-        metrics.recordWriteStateMachineCompletionNs(
-            Time.monotonicNowNanos() - startTime);
+            write.getChunkData().getChunkName());
       }
+      raftFuture.complete(r::toByteString);
+      metrics.recordWriteStateMachineCompletionNs(
+          Time.monotonicNowNanos() - startTime);
+    }
+  }
 
-      writeChunkFutureMap.remove(entryIndex);
-      return r;
-    });
-    return raftFuture;
+  private void validateLongRunningWrite() throws StorageContainerException {
+    // get min valid write chunk operation's future context
+    Map.Entry<Long, WriteFutures> writeFutureContextEntry = null;
+    for (boolean found = false; !found;) {
+      writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+      if (null == writeFutureContextEntry) {
+        return;
+      }
+      if (writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+        // there is a possibility that writeChunkFutureMap may have dangling 
entry, as remove is done before add future
+        writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
+      } else {
+        found = true;
+      }
+    }
+    if (null == writeFutureContextEntry) {
+      return;
+    }

Review Comment:
   `writeFutureContextEntry` cannot be null here.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -588,55 +633,91 @@ private CompletableFuture<Message> writeStateMachineData(
             stateMachineHealthy.set(false);
             raftFuture.completeExceptionally(e);
             throw e;
+          } finally {
+            // Remove the future once it finishes execution from the
+            writeChunkFutureMap.remove(entryIndex);
           }
-        }, getChunkExecutor(requestProto.getWriteChunk()));
+        });
 
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, 
startTime));
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
               "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
           entryIndex, write.getChunkData().getChunkName());
     }
-    // Remove the future once it finishes execution from the
-    // writeChunkFutureMap.
-    writeChunkFuture.thenApply(r -> {
-      if (r.getResult() != ContainerProtos.Result.SUCCESS
-          && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
-          // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
-          // that should not crash the pipeline.
-          && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
-        StorageContainerException sce =
-            new StorageContainerException(r.getMessage(), r.getResult());
-        LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+    return raftFuture;
+  }
+
+  private void handleCommandResult(ContainerCommandRequestProto requestProto, 
long entryIndex, long startTime,
+                                   ContainerCommandResponseProto r, 
WriteChunkRequestProto write,
+                                   CompletableFuture<Message> raftFuture) {
+    if (r.getResult() != ContainerProtos.Result.SUCCESS
+        && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+        && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+        // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
+        // that should not crash the pipeline.
+        && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+      StorageContainerException sce =
+          new StorageContainerException(r.getMessage(), r.getResult());
+      LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+          write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+          write.getChunkData().getChunkName() + " Error message: " +
+          r.getMessage() + " Container Result: " + r.getResult());
+      metrics.incNumWriteDataFails();
+      // If the write fails currently we mark the stateMachine as unhealthy.
+      // This leads to pipeline close. Any change in that behavior requires
+      // handling the entry for the write chunk in cache.
+      stateMachineHealthy.set(false);
+      unhealthyContainers.add(write.getBlockID().getContainerID());
+      raftFuture.completeExceptionally(sce);
+    } else {
+      metrics.incNumBytesWrittenCount(
+          requestProto.getWriteChunk().getChunkData().getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getGroupId() +
+            ": writeChunk writeStateMachineData  completed: blockId" +
             write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-            write.getChunkData().getChunkName() + " Error message: " +
-            r.getMessage() + " Container Result: " + r.getResult());
-        metrics.incNumWriteDataFails();
-        // If the write fails currently we mark the stateMachine as unhealthy.
-        // This leads to pipeline close. Any change in that behavior requires
-        // handling the entry for the write chunk in cache.
-        stateMachineHealthy.set(false);
-        unhealthyContainers.add(write.getBlockID().getContainerID());
-        raftFuture.completeExceptionally(sce);
-      } else {
-        metrics.incNumBytesWrittenCount(
-            requestProto.getWriteChunk().getChunkData().getLen());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getGroupId() +
-              ": writeChunk writeStateMachineData  completed: blockId" +
-              write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-              write.getChunkData().getChunkName());
-        }
-        raftFuture.complete(r::toByteString);
-        metrics.recordWriteStateMachineCompletionNs(
-            Time.monotonicNowNanos() - startTime);
+            write.getChunkData().getChunkName());
       }
+      raftFuture.complete(r::toByteString);
+      metrics.recordWriteStateMachineCompletionNs(
+          Time.monotonicNowNanos() - startTime);
+    }
+  }
 
-      writeChunkFutureMap.remove(entryIndex);
-      return r;
-    });
-    return raftFuture;
+  private void validateLongRunningWrite() throws StorageContainerException {
+    // get min valid write chunk operation's future context
+    Map.Entry<Long, WriteFutures> writeFutureContextEntry = null;
+    for (boolean found = false; !found;) {
+      writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+      if (null == writeFutureContextEntry) {
+        return;
+      }
+      if (writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+        // there is a possibility that writeChunkFutureMap may have dangling 
entry, as remove is done before add future
+        writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
+      } else {
+        found = true;
+      }
+    }
+    if (null == writeFutureContextEntry) {
+      return;
+    }
+    // validate for timeout in milli second
+    long waitTime = Time.monotonicNowNanos() - 
writeFutureContextEntry.getValue().getStartTime();
+    if (waitTime > writeChunkWaitMaxNs) {
+      LOG.error("Write chunk has taken {}ns crossing threshold {}ns for index 
{} groupId {}", waitTime,
+          writeChunkWaitMaxNs, writeFutureContextEntry.getKey(), getGroupId());
+      stateMachineHealthy.set(false);
+      writeChunkFutureMap.forEach((key, value) -> {
+        LOG.error("Cancelling write chunk due to timeout {}ns crossing {}ns 
for index {}, groupId {}", waitTime,

Review Comment:
   Don't print `forEach` error here.   The same message can be printed a 
thousand times if there are a thousand entries.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,22 +606,25 @@ private CompletableFuture<Message> writeStateMachineData(
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
-    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
-    // thread.
-    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() -> {
+    // ensure the write chunk happens asynchronously in writeChunkExecutor 
pool thread.
+    Future<ContainerCommandResponseProto> future = getChunkExecutor(
+        requestProto.getWriteChunk()).submit(() -> {
           try {
             try {
               checkContainerHealthy(write.getBlockID().getContainerID(), true);
             } catch (StorageContainerException e) {
-              return ContainerUtils.logAndReturnError(LOG, e, requestProto);
+              ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+              handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+              return result;
             }
             metrics.recordWriteStateMachineQueueingLatencyNs(
                 Time.monotonicNowNanos() - startTime);
-            return dispatchCommand(requestProto, context);
+            ContainerCommandResponseProto result = 
dispatchCommand(requestProto, context);
+            handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+            return result;
           } catch (Exception e) {
             LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
-                "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),
+                    "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),

Review Comment:
   This indentation change should be reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to