sumitagrawl commented on code in PR #8022:
URL: https://github.com/apache/ozone/pull/8022#discussion_r1990995206
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -588,55 +633,91 @@ private CompletableFuture<Message> writeStateMachineData(
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(e);
throw e;
+ } finally {
+ // Remove the future once it finishes execution from the
+ writeChunkFutureMap.remove(entryIndex);
}
- }, getChunkExecutor(requestProto.getWriteChunk()));
+ });
- writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+ writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture,
startTime));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName());
}
- // Remove the future once it finishes execution from the
- // writeChunkFutureMap.
- writeChunkFuture.thenApply(r -> {
- if (r.getResult() != ContainerProtos.Result.SUCCESS
- && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
- && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
- // After concurrent flushes are allowed on the same key, chunk file
inconsistencies can happen and
- // that should not crash the pipeline.
- && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY)
{
- StorageContainerException sce =
- new StorageContainerException(r.getMessage(), r.getResult());
- LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed:
blockId" +
+ return raftFuture;
+ }
+
+ private void handleCommandResult(ContainerCommandRequestProto requestProto,
long entryIndex, long startTime,
+ ContainerCommandResponseProto r,
WriteChunkRequestProto write,
+ CompletableFuture<Message> raftFuture) {
+ if (r.getResult() != ContainerProtos.Result.SUCCESS
+ && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+ && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+ // After concurrent flushes are allowed on the same key, chunk file
inconsistencies can happen and
+ // that should not crash the pipeline.
+ && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+ StorageContainerException sce =
+ new StorageContainerException(r.getMessage(), r.getResult());
+ LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed:
blockId" +
+ write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+ write.getChunkData().getChunkName() + " Error message: " +
+ r.getMessage() + " Container Result: " + r.getResult());
+ metrics.incNumWriteDataFails();
+ // If the write fails currently we mark the stateMachine as unhealthy.
+ // This leads to pipeline close. Any change in that behavior requires
+ // handling the entry for the write chunk in cache.
+ stateMachineHealthy.set(false);
+ unhealthyContainers.add(write.getBlockID().getContainerID());
+ raftFuture.completeExceptionally(sce);
+ } else {
+ metrics.incNumBytesWrittenCount(
+ requestProto.getWriteChunk().getChunkData().getLen());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getGroupId() +
+ ": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
- write.getChunkData().getChunkName() + " Error message: " +
- r.getMessage() + " Container Result: " + r.getResult());
- metrics.incNumWriteDataFails();
- // If the write fails currently we mark the stateMachine as unhealthy.
- // This leads to pipeline close. Any change in that behavior requires
- // handling the entry for the write chunk in cache.
- stateMachineHealthy.set(false);
- unhealthyContainers.add(write.getBlockID().getContainerID());
- raftFuture.completeExceptionally(sce);
- } else {
- metrics.incNumBytesWrittenCount(
- requestProto.getWriteChunk().getChunkData().getLen());
- if (LOG.isDebugEnabled()) {
- LOG.debug(getGroupId() +
- ": writeChunk writeStateMachineData completed: blockId" +
- write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
- write.getChunkData().getChunkName());
- }
- raftFuture.complete(r::toByteString);
- metrics.recordWriteStateMachineCompletionNs(
- Time.monotonicNowNanos() - startTime);
+ write.getChunkData().getChunkName());
}
+ raftFuture.complete(r::toByteString);
+ metrics.recordWriteStateMachineCompletionNs(
+ Time.monotonicNowNanos() - startTime);
+ }
+ }
- writeChunkFutureMap.remove(entryIndex);
- return r;
- });
- return raftFuture;
+ private void validateLongRunningWrite() throws StorageContainerException {
+ // get min valid write chunk operation's future context
+ Map.Entry<Long, WriteFutures> writeFutureContextEntry = null;
+ for (boolean found = false; !found;) {
+ writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+ if (null == writeFutureContextEntry) {
+ return;
+ }
+ if (writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+ // there is a possibility that writeChunkFutureMap may have dangling
entry, as remove is done before add future
+ writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
+ } else {
+ found = true;
+ }
+ }
+ if (null == writeFutureContextEntry) {
+ return;
+ }
+ // validate for timeout in milli second
+ long waitTime = Time.monotonicNowNanos() -
writeFutureContextEntry.getValue().getStartTime();
+ if (waitTime > writeChunkWaitMaxNs) {
+ LOG.error("Write chunk has taken {}ns crossing threshold {}ns for index
{} groupId {}", waitTime,
+ writeChunkWaitMaxNs, writeFutureContextEntry.getKey(), getGroupId());
+ stateMachineHealthy.set(false);
+ writeChunkFutureMap.forEach((key, value) -> {
+ LOG.error("Cancelling write chunk due to timeout {}ns crossing {}ns
for index {}, groupId {}", waitTime,
Review Comment:
removed, anyway as interrupt exception, the caller will have log about index
and group failure, there is covered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]