sumitagrawl commented on code in PR #8022:
URL: https://github.com/apache/ozone/pull/8022#discussion_r1988466179
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -564,79 +604,111 @@ private CompletableFuture<Message> writeStateMachineData(
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
- // ensure the write chunk happens asynchronously in writeChunkExecutor pool
- // thread.
- CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
- CompletableFuture.supplyAsync(() -> {
- try {
- try {
- checkContainerHealthy(write.getBlockID().getContainerID(), true);
- } catch (StorageContainerException e) {
- return ContainerUtils.logAndReturnError(LOG, e, requestProto);
- }
- metrics.recordWriteStateMachineQueueingLatencyNs(
- Time.monotonicNowNanos() - startTime);
- return dispatchCommand(requestProto, context);
- } catch (Exception e) {
- LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
+ // ensure the write chunk happens asynchronously in writeChunkExecutor
pool thread.
+ Future<ContainerCommandResponseProto> future =
getChunkExecutor(requestProto.getWriteChunk()).submit(() -> {
+ try {
+ try {
+ checkContainerHealthy(write.getBlockID().getContainerID(), true);
+ } catch (StorageContainerException e) {
+ ContainerCommandResponseProto result =
ContainerUtils.logAndReturnError(LOG, e, requestProto);
+ handleCommandResult(requestProto, entryIndex, startTime, result,
write, raftFuture);
+ return result;
+ }
+ metrics.recordWriteStateMachineQueueingLatencyNs(
+ Time.monotonicNowNanos() - startTime);
+ ContainerCommandResponseProto result = dispatchCommand(requestProto,
context);
+ handleCommandResult(requestProto, entryIndex, startTime, result,
write, raftFuture);
+ return result;
+ } catch (Exception e) {
+ LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
"{} logIndex {} chunkName {}", getGroupId(),
write.getBlockID(),
- entryIndex, write.getChunkData().getChunkName(), e);
- metrics.incNumWriteDataFails();
- // write chunks go in parallel. It's possible that one write chunk
- // see the stateMachine is marked unhealthy by other parallel
thread
- unhealthyContainers.add(write.getBlockID().getContainerID());
- stateMachineHealthy.set(false);
- raftFuture.completeExceptionally(e);
- throw e;
- }
- }, getChunkExecutor(requestProto.getWriteChunk()));
+ entryIndex, write.getChunkData().getChunkName(), e);
+ metrics.incNumWriteDataFails();
+ // write chunks go in parallel. It's possible that one write chunk
+ // see the stateMachine is marked unhealthy by other parallel thread
+ unhealthyContainers.add(write.getBlockID().getContainerID());
+ stateMachineHealthy.set(false);
+ raftFuture.completeExceptionally(e);
+ throw e;
+ } finally {
+ // Remove the future once it finishes execution from the
+ writeChunkFutureMap.remove(entryIndex);
+ }
+ });
- writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+ writeChunkFutureMap.put(entryIndex, new WriteFutureContext(future,
raftFuture, startTime));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName());
}
- // Remove the future once it finishes execution from the
- // writeChunkFutureMap.
- writeChunkFuture.thenApply(r -> {
- if (r.getResult() != ContainerProtos.Result.SUCCESS
- && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
- && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
- // After concurrent flushes are allowed on the same key, chunk file
inconsistencies can happen and
- // that should not crash the pipeline.
- && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY)
{
- StorageContainerException sce =
- new StorageContainerException(r.getMessage(), r.getResult());
- LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed:
blockId" +
+ return raftFuture;
+ }
+
+ private void handleCommandResult(ContainerCommandRequestProto requestProto,
long entryIndex, long startTime,
+ ContainerCommandResponseProto r,
WriteChunkRequestProto write,
+ CompletableFuture<Message> raftFuture) {
+ if (r.getResult() != ContainerProtos.Result.SUCCESS
+ && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+ && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+ // After concurrent flushes are allowed on the same key, chunk file
inconsistencies can happen and
+ // that should not crash the pipeline.
+ && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+ StorageContainerException sce =
+ new StorageContainerException(r.getMessage(), r.getResult());
+ LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed:
blockId" +
+ write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+ write.getChunkData().getChunkName() + " Error message: " +
+ r.getMessage() + " Container Result: " + r.getResult());
+ metrics.incNumWriteDataFails();
+ // If the write fails currently we mark the stateMachine as unhealthy.
+ // This leads to pipeline close. Any change in that behavior requires
+ // handling the entry for the write chunk in cache.
+ stateMachineHealthy.set(false);
+ unhealthyContainers.add(write.getBlockID().getContainerID());
+ raftFuture.completeExceptionally(sce);
+ } else {
+ metrics.incNumBytesWrittenCount(
+ requestProto.getWriteChunk().getChunkData().getLen());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getGroupId() +
+ ": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
- write.getChunkData().getChunkName() + " Error message: " +
- r.getMessage() + " Container Result: " + r.getResult());
- metrics.incNumWriteDataFails();
- // If the write fails currently we mark the stateMachine as unhealthy.
- // This leads to pipeline close. Any change in that behavior requires
- // handling the entry for the write chunk in cache.
- stateMachineHealthy.set(false);
- unhealthyContainers.add(write.getBlockID().getContainerID());
- raftFuture.completeExceptionally(sce);
- } else {
- metrics.incNumBytesWrittenCount(
- requestProto.getWriteChunk().getChunkData().getLen());
- if (LOG.isDebugEnabled()) {
- LOG.debug(getGroupId() +
- ": writeChunk writeStateMachineData completed: blockId" +
- write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
- write.getChunkData().getChunkName());
- }
- raftFuture.complete(r::toByteString);
- metrics.recordWriteStateMachineCompletionNs(
- Time.monotonicNowNanos() - startTime);
+ write.getChunkData().getChunkName());
}
+ raftFuture.complete(r::toByteString);
+ metrics.recordWriteStateMachineCompletionNs(
+ Time.monotonicNowNanos() - startTime);
+ }
+ }
- writeChunkFutureMap.remove(entryIndex);
- return r;
- });
- return raftFuture;
+ private void validateLongRunningWrite() throws IOException {
+ // get min valid write chunk operation's future context
+ Map.Entry<Long, WriteFutureContext> writeFutureContextEntry = null;
+ while (!writeChunkFutureMap.isEmpty()) {
+ writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+ // there is a possibility of entry being removed before added in map,
cleanup those
+ if (null == writeFutureContextEntry ||
!writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+ break;
+ }
+ writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
+ }
+ if (null == writeFutureContextEntry) {
+ return;
+ }
+ // validate for timeout in milli second
+ long waitTime = Time.monotonicNow() -
writeFutureContextEntry.getValue().getStartTime() / 1000000;
Review Comment:
changed all unit to "ns" now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]