sumitagrawl commented on code in PR #8022:
URL: https://github.com/apache/ozone/pull/8022#discussion_r1988489169


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,79 +604,111 @@ private CompletableFuture<Message> writeStateMachineData(
             .setContainer2BCSIDMap(container2BCSIDMap)
             .build();
     CompletableFuture<Message> raftFuture = new CompletableFuture<>();
-    // ensure the write chunk happens asynchronously in writeChunkExecutor pool
-    // thread.
-    CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
-        CompletableFuture.supplyAsync(() -> {
-          try {
-            try {
-              checkContainerHealthy(write.getBlockID().getContainerID(), true);
-            } catch (StorageContainerException e) {
-              return ContainerUtils.logAndReturnError(LOG, e, requestProto);
-            }
-            metrics.recordWriteStateMachineQueueingLatencyNs(
-                Time.monotonicNowNanos() - startTime);
-            return dispatchCommand(requestProto, context);
-          } catch (Exception e) {
-            LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
+    // ensure the write chunk happens asynchronously in writeChunkExecutor 
pool thread.
+    Future<ContainerCommandResponseProto> future = 
getChunkExecutor(requestProto.getWriteChunk()).submit(() -> {
+      try {
+        try {
+          checkContainerHealthy(write.getBlockID().getContainerID(), true);
+        } catch (StorageContainerException e) {
+          ContainerCommandResponseProto result = 
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+          handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+          return result;
+        }
+        metrics.recordWriteStateMachineQueueingLatencyNs(
+            Time.monotonicNowNanos() - startTime);
+        ContainerCommandResponseProto result = dispatchCommand(requestProto, 
context);
+        handleCommandResult(requestProto, entryIndex, startTime, result, 
write, raftFuture);
+        return result;
+      } catch (Exception e) {
+        LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
                 "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),
-                entryIndex, write.getChunkData().getChunkName(), e);
-            metrics.incNumWriteDataFails();
-            // write chunks go in parallel. It's possible that one write chunk
-            // see the stateMachine is marked unhealthy by other parallel 
thread
-            unhealthyContainers.add(write.getBlockID().getContainerID());
-            stateMachineHealthy.set(false);
-            raftFuture.completeExceptionally(e);
-            throw e;
-          }
-        }, getChunkExecutor(requestProto.getWriteChunk()));
+            entryIndex, write.getChunkData().getChunkName(), e);
+        metrics.incNumWriteDataFails();
+        // write chunks go in parallel. It's possible that one write chunk
+        // see the stateMachine is marked unhealthy by other parallel thread
+        unhealthyContainers.add(write.getBlockID().getContainerID());
+        stateMachineHealthy.set(false);
+        raftFuture.completeExceptionally(e);
+        throw e;
+      } finally {
+        // Remove the future once it finishes execution from the
+        writeChunkFutureMap.remove(entryIndex);
+      }
+    });
 
-    writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+    writeChunkFutureMap.put(entryIndex, new WriteFutureContext(future, 
raftFuture, startTime));
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
               "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
           entryIndex, write.getChunkData().getChunkName());
     }
-    // Remove the future once it finishes execution from the
-    // writeChunkFutureMap.
-    writeChunkFuture.thenApply(r -> {
-      if (r.getResult() != ContainerProtos.Result.SUCCESS
-          && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
-          // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
-          // that should not crash the pipeline.
-          && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
-        StorageContainerException sce =
-            new StorageContainerException(r.getMessage(), r.getResult());
-        LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+    return raftFuture;
+  }
+
+  private void handleCommandResult(ContainerCommandRequestProto requestProto, 
long entryIndex, long startTime,
+                                   ContainerCommandResponseProto r, 
WriteChunkRequestProto write,
+                                   CompletableFuture<Message> raftFuture) {
+    if (r.getResult() != ContainerProtos.Result.SUCCESS
+        && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+        && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+        // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
+        // that should not crash the pipeline.
+        && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+      StorageContainerException sce =
+          new StorageContainerException(r.getMessage(), r.getResult());
+      LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
+          write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+          write.getChunkData().getChunkName() + " Error message: " +
+          r.getMessage() + " Container Result: " + r.getResult());
+      metrics.incNumWriteDataFails();
+      // If the write fails currently we mark the stateMachine as unhealthy.
+      // This leads to pipeline close. Any change in that behavior requires
+      // handling the entry for the write chunk in cache.
+      stateMachineHealthy.set(false);
+      unhealthyContainers.add(write.getBlockID().getContainerID());
+      raftFuture.completeExceptionally(sce);
+    } else {
+      metrics.incNumBytesWrittenCount(
+          requestProto.getWriteChunk().getChunkData().getLen());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getGroupId() +
+            ": writeChunk writeStateMachineData  completed: blockId" +
             write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-            write.getChunkData().getChunkName() + " Error message: " +
-            r.getMessage() + " Container Result: " + r.getResult());
-        metrics.incNumWriteDataFails();
-        // If the write fails currently we mark the stateMachine as unhealthy.
-        // This leads to pipeline close. Any change in that behavior requires
-        // handling the entry for the write chunk in cache.
-        stateMachineHealthy.set(false);
-        unhealthyContainers.add(write.getBlockID().getContainerID());
-        raftFuture.completeExceptionally(sce);
-      } else {
-        metrics.incNumBytesWrittenCount(
-            requestProto.getWriteChunk().getChunkData().getLen());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getGroupId() +
-              ": writeChunk writeStateMachineData  completed: blockId" +
-              write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
-              write.getChunkData().getChunkName());
-        }
-        raftFuture.complete(r::toByteString);
-        metrics.recordWriteStateMachineCompletionNs(
-            Time.monotonicNowNanos() - startTime);
+            write.getChunkData().getChunkName());
       }
+      raftFuture.complete(r::toByteString);
+      metrics.recordWriteStateMachineCompletionNs(
+          Time.monotonicNowNanos() - startTime);
+    }
+  }
 
-      writeChunkFutureMap.remove(entryIndex);
-      return r;
-    });
-    return raftFuture;
+  private void validateLongRunningWrite() throws IOException {
+    // get min valid write chunk operation's future context
+    Map.Entry<Long, WriteFutureContext> writeFutureContextEntry = null;
+    while (!writeChunkFutureMap.isEmpty()) {
+      writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+      // there is a possibility of entry being removed before added in map, 
cleanup those
+      if (null == writeFutureContextEntry || 
!writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+        break;
+      }
+      writeChunkFutureMap.remove(writeFutureContextEntry.getKey());

Review Comment:
   There is a possibility observed in integration test that writeChunkFutureMap 
have dangling entry. So one of possibility,
   Thread T1: called from ratis
   - future = executorService.submit()
   - map.add(index, future)
   
   Thread T2: scheduled as above
   - operate write
   - remove from map map.remove(index)
   
   So due to ordering of T1 and T2, there seems possibility that entry from map 
for index is removed first, and then entry is added to map.
   
   So to handle this situation, additional remove is added if future is 
completed already.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to