errose28 commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r2035612223


##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void 
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
 
     Assertions.assertEquals(0, icrCount.get());
     // This should trigger container report validation in the ICR handler 
above.
-    keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), 
container, Collections.emptySet());
+    DNContainerOperationClient mockDnClient = 
mock(DNContainerOperationClient.class);
+    DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+    when(mockDnClient.getContainerChecksumInfo(anyLong(), 
any())).thenReturn(null);
+    keyValueHandler.reconcileContainer(mockDnClient, container, 
Sets.newHashSet(peer1, peer2, peer3));
+    // Make sure all the replicas are used for reconciliation.
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
     Assertions.assertEquals(1, icrCount.get());
   }
 
+  @ParameterizedTest
+  @MethodSource("corruptionValues")
+  public void testFullContainerReconciliation(int numBlocks, int numChunks) 
throws Exception {
+    KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+    ContainerChecksumTreeManager checksumManager = 
kvHandler.getChecksumManager();
+    DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, 
null, null);
+    final long containerID = 100L;
+    // Create 3 containers with 15 blocks each and 3 replicas.
+    List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler, 
containerID, 15, 3);
+    assertEquals(3, containers.size());
+
+    // Introduce corruption in each container on different replicas.
+    introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks, 
false);
+    introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks, 
true);
+
+    // Without reconciliation, checksums should be different because of the 
corruption.
+    Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+    for (KeyValueContainer kvContainer : containers) {
+      kvHandler.createContainerMerkleTree(kvContainer);
+      Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+          checksumManager.read(kvContainer.getContainerData());
+      assertTrue(containerChecksumInfo.isPresent());
+      
checksumsBeforeReconciliation.add(containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum());
+    }
+    // There should be more than 1 checksum because of the corruption.
+    assertTrue(checksumsBeforeReconciliation.size() > 1);
+
+    List<DatanodeDetails> datanodes = 
ImmutableList.of(randomDatanodeDetails(), randomDatanodeDetails(),
+        randomDatanodeDetails());
+    Map<String, KeyValueContainer> dnToContainerMap = new HashMap<>();
+    dnToContainerMap.put(datanodes.get(0).getUuidString(), containers.get(0));
+    dnToContainerMap.put(datanodes.get(1).getUuidString(), containers.get(1));
+    dnToContainerMap.put(datanodes.get(2).getUuidString(), containers.get(2));
+
+    // Setup mock for each datanode network calls needed for reconciliation.
+    try (MockedStatic<ContainerProtocolCalls> containerProtocolMock =
+             Mockito.mockStatic(ContainerProtocolCalls.class)) {
+      // Mock getContainerChecksumInfo
+      containerProtocolMock.when(() -> 
ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
+          .thenAnswer(inv -> {
+            XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+            DatanodeDetails dn = 
xceiverClientSpi.getPipeline().getClosestNode();

Review Comment:
   We expect the client to only use single node pipelines to reconcile with one 
peer at a time.
   ```suggestion
               Pipeline pipeline = xceiverClientSpi.getPipeline();
               assertEquals(1, pipeline.size())
               DatanodeDetails dn = pipeline.getFirstNode();
   ```



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void 
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
 
     Assertions.assertEquals(0, icrCount.get());
     // This should trigger container report validation in the ICR handler 
above.
-    keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), 
container, Collections.emptySet());
+    DNContainerOperationClient mockDnClient = 
mock(DNContainerOperationClient.class);
+    DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+    when(mockDnClient.getContainerChecksumInfo(anyLong(), 
any())).thenReturn(null);
+    keyValueHandler.reconcileContainer(mockDnClient, container, 
Sets.newHashSet(peer1, peer2, peer3));
+    // Make sure all the replicas are used for reconciliation.
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
     Assertions.assertEquals(1, icrCount.get());
   }
 
+  @ParameterizedTest
+  @MethodSource("corruptionValues")
+  public void testFullContainerReconciliation(int numBlocks, int numChunks) 
throws Exception {
+    KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+    ContainerChecksumTreeManager checksumManager = 
kvHandler.getChecksumManager();
+    DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, 
null, null);
+    final long containerID = 100L;
+    // Create 3 containers with 15 blocks each and 3 replicas.
+    List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler, 
containerID, 15, 3);
+    assertEquals(3, containers.size());
+
+    // Introduce corruption in each container on different replicas.
+    introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks, 
false);
+    introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks, 
true);
+
+    // Without reconciliation, checksums should be different because of the 
corruption.
+    Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+    for (KeyValueContainer kvContainer : containers) {
+      kvHandler.createContainerMerkleTree(kvContainer);
+      Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+          checksumManager.read(kvContainer.getContainerData());
+      assertTrue(containerChecksumInfo.isPresent());
+      
checksumsBeforeReconciliation.add(containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum());

Review Comment:
   We should also check that a matching data checksum was added to the 
ContainerData.



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void 
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
 
     Assertions.assertEquals(0, icrCount.get());
     // This should trigger container report validation in the ICR handler 
above.
-    keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), 
container, Collections.emptySet());
+    DNContainerOperationClient mockDnClient = 
mock(DNContainerOperationClient.class);
+    DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+    when(mockDnClient.getContainerChecksumInfo(anyLong(), 
any())).thenReturn(null);
+    keyValueHandler.reconcileContainer(mockDnClient, container, 
Sets.newHashSet(peer1, peer2, peer3));
+    // Make sure all the replicas are used for reconciliation.
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
     Assertions.assertEquals(1, icrCount.get());
   }
 
+  @ParameterizedTest
+  @MethodSource("corruptionValues")
+  public void testFullContainerReconciliation(int numBlocks, int numChunks) 
throws Exception {
+    KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+    ContainerChecksumTreeManager checksumManager = 
kvHandler.getChecksumManager();
+    DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, 
null, null);
+    final long containerID = 100L;
+    // Create 3 containers with 15 blocks each and 3 replicas.
+    List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler, 
containerID, 15, 3);
+    assertEquals(3, containers.size());
+
+    // Introduce corruption in each container on different replicas.
+    introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks, 
false);
+    introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks, 
true);
+
+    // Without reconciliation, checksums should be different because of the 
corruption.
+    Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+    for (KeyValueContainer kvContainer : containers) {
+      kvHandler.createContainerMerkleTree(kvContainer);
+      Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+          checksumManager.read(kvContainer.getContainerData());
+      assertTrue(containerChecksumInfo.isPresent());
+      
checksumsBeforeReconciliation.add(containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum());

Review Comment:
   Same at the end of the test.



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void 
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
 
     Assertions.assertEquals(0, icrCount.get());
     // This should trigger container report validation in the ICR handler 
above.
-    keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), 
container, Collections.emptySet());
+    DNContainerOperationClient mockDnClient = 
mock(DNContainerOperationClient.class);
+    DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+    when(mockDnClient.getContainerChecksumInfo(anyLong(), 
any())).thenReturn(null);
+    keyValueHandler.reconcileContainer(mockDnClient, container, 
Sets.newHashSet(peer1, peer2, peer3));
+    // Make sure all the replicas are used for reconciliation.
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
     Assertions.assertEquals(1, icrCount.get());
   }
 
+  @ParameterizedTest
+  @MethodSource("corruptionValues")
+  public void testFullContainerReconciliation(int numBlocks, int numChunks) 
throws Exception {
+    KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+    ContainerChecksumTreeManager checksumManager = 
kvHandler.getChecksumManager();
+    DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, 
null, null);
+    final long containerID = 100L;
+    // Create 3 containers with 15 blocks each and 3 replicas.
+    List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler, 
containerID, 15, 3);
+    assertEquals(3, containers.size());
+
+    // Introduce corruption in each container on different replicas.
+    introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks, 
false);
+    introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks, 
true);
+
+    // Without reconciliation, checksums should be different because of the 
corruption.
+    Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+    for (KeyValueContainer kvContainer : containers) {
+      kvHandler.createContainerMerkleTree(kvContainer);
+      Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+          checksumManager.read(kvContainer.getContainerData());
+      assertTrue(containerChecksumInfo.isPresent());
+      
checksumsBeforeReconciliation.add(containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum());
+    }
+    // There should be more than 1 checksum because of the corruption.
+    assertTrue(checksumsBeforeReconciliation.size() > 1);
+
+    List<DatanodeDetails> datanodes = 
ImmutableList.of(randomDatanodeDetails(), randomDatanodeDetails(),
+        randomDatanodeDetails());
+    Map<String, KeyValueContainer> dnToContainerMap = new HashMap<>();
+    dnToContainerMap.put(datanodes.get(0).getUuidString(), containers.get(0));
+    dnToContainerMap.put(datanodes.get(1).getUuidString(), containers.get(1));
+    dnToContainerMap.put(datanodes.get(2).getUuidString(), containers.get(2));
+
+    // Setup mock for each datanode network calls needed for reconciliation.
+    try (MockedStatic<ContainerProtocolCalls> containerProtocolMock =
+             Mockito.mockStatic(ContainerProtocolCalls.class)) {
+      // Mock getContainerChecksumInfo
+      containerProtocolMock.when(() -> 
ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
+          .thenAnswer(inv -> {
+            XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+            DatanodeDetails dn = 
xceiverClientSpi.getPipeline().getClosestNode();

Review Comment:
   Same in the other mocks. I would also move the construction of the container 
protocol mock to a helper method for readability.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws 
Exception {
     }
   }
 
-  private long writeDataAndGetContainer(boolean close) throws Exception {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
+  @Test
+  public void testContainerChecksumWithBlockMissing() throws Exception {
+    // 1. Write data to a container.
+    // Read the key back check it's hash.
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024 
* 1024, volume, bucket);
+    long containerID = containerAndData.getLeft();
+    byte[] data = containerAndData.getRight();
+    // Get the datanodes where the container replicas are stored.
+    List<DatanodeDetails> dataNodeDetails = 
cluster.getStorageContainerManager().getContainerManager()
+        .getContainerReplicas(ContainerID.valueOf(containerID))
+        .stream().map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
+    Assertions.assertEquals(3, dataNodeDetails.size());
+    HddsDatanodeService hddsDatanodeService = 
cluster.getHddsDatanode(dataNodeDetails.get(0));
+    DatanodeStateMachine datanodeStateMachine = 
hddsDatanodeService.getDatanodeStateMachine();
+    Container<?> container = 
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    KeyValueHandler kvHandler = (KeyValueHandler) 
datanodeStateMachine.getContainer().getDispatcher()
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+    BlockManager blockManager = kvHandler.getBlockManager();
+    List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+    String chunksPath = container.getContainerData().getChunksPath();
+    long oldDataChecksum = 
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    // 2. Delete some blocks to simulate missing blocks.
+    try (DBHandle db = BlockUtils.getDB(containerData, conf);
+         BatchOperation op = 
db.getStore().getBatchHandler().initBatchOperation()) {
+      for (int i = 0; i < blockDataList.size(); i += 2) {
+        BlockData blockData = blockDataList.get(i);
+        // Delete the block metadata from the container db
+        db.getStore().getBlockDataTable().deleteWithBatch(op, 
containerData.getBlockKey(blockData.getLocalID()));
+        // Delete the block file.
+        Files.deleteIfExists(Paths.get(chunksPath + "/" + 
blockData.getBlockID().getLocalID() + ".block"));
+      }
+      db.getStore().getBatchHandler().commitBatchOperation(op);
+      db.getStore().flushDB();
+    }
+
+    // TODO: Use On-demand container scanner to build the new container merkle 
tree. (HDDS-10374)
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(container);
+    ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+        readChecksumFile(container.getContainerData());
+    long dataChecksumAfterBlockDelete = 
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+    // Checksum should have changed after block delete.
+    Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+    // 3. Reconcile the container.
+    
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+        return 
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum() == 
oldDataChecksum;

Review Comment:
   The following section is checking SCM state, but the heartbeat may not have 
reached SCM yet. I think this section needs to wait until SCM gets the updated 
info.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws 
Exception {
     }
   }
 
-  private long writeDataAndGetContainer(boolean close) throws Exception {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
+  @Test
+  public void testContainerChecksumWithBlockMissing() throws Exception {
+    // 1. Write data to a container.
+    // Read the key back check it's hash.

Review Comment:
   ```suggestion
       // Read the key back and check its hash.
   ```



##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void 
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
 
     Assertions.assertEquals(0, icrCount.get());
     // This should trigger container report validation in the ICR handler 
above.
-    keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class), 
container, Collections.emptySet());
+    DNContainerOperationClient mockDnClient = 
mock(DNContainerOperationClient.class);
+    DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+    DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+    when(mockDnClient.getContainerChecksumInfo(anyLong(), 
any())).thenReturn(null);
+    keyValueHandler.reconcileContainer(mockDnClient, container, 
Sets.newHashSet(peer1, peer2, peer3));
+    // Make sure all the replicas are used for reconciliation.
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+    Mockito.verify(mockDnClient, 
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
     Assertions.assertEquals(1, icrCount.get());
   }
 
+  @ParameterizedTest
+  @MethodSource("corruptionValues")
+  public void testFullContainerReconciliation(int numBlocks, int numChunks) 
throws Exception {
+    KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+    ContainerChecksumTreeManager checksumManager = 
kvHandler.getChecksumManager();
+    DNContainerOperationClient dnClient = new DNContainerOperationClient(conf, 
null, null);
+    final long containerID = 100L;
+    // Create 3 containers with 15 blocks each and 3 replicas.
+    List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler, 
containerID, 15, 3);
+    assertEquals(3, containers.size());
+
+    // Introduce corruption in each container on different replicas.
+    introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks, 
false);
+    introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks, 
true);
+
+    // Without reconciliation, checksums should be different because of the 
corruption.
+    Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+    for (KeyValueContainer kvContainer : containers) {
+      kvHandler.createContainerMerkleTree(kvContainer);

Review Comment:
   Building the tree from metadata is supposed to happen automatically in 
reconciliation if the tree does not exist, and again at the end with the 
updated values. I think we should avoid calling this in cases where we invoke 
reconciliation directly.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws 
Exception {
     }
   }
 
-  private long writeDataAndGetContainer(boolean close) throws Exception {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
+  @Test
+  public void testContainerChecksumWithBlockMissing() throws Exception {
+    // 1. Write data to a container.
+    // Read the key back check it's hash.
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024 
* 1024, volume, bucket);
+    long containerID = containerAndData.getLeft();
+    byte[] data = containerAndData.getRight();
+    // Get the datanodes where the container replicas are stored.
+    List<DatanodeDetails> dataNodeDetails = 
cluster.getStorageContainerManager().getContainerManager()
+        .getContainerReplicas(ContainerID.valueOf(containerID))
+        .stream().map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
+    Assertions.assertEquals(3, dataNodeDetails.size());
+    HddsDatanodeService hddsDatanodeService = 
cluster.getHddsDatanode(dataNodeDetails.get(0));
+    DatanodeStateMachine datanodeStateMachine = 
hddsDatanodeService.getDatanodeStateMachine();
+    Container<?> container = 
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    KeyValueHandler kvHandler = (KeyValueHandler) 
datanodeStateMachine.getContainer().getDispatcher()
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+    BlockManager blockManager = kvHandler.getBlockManager();
+    List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+    String chunksPath = container.getContainerData().getChunksPath();
+    long oldDataChecksum = 
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    // 2. Delete some blocks to simulate missing blocks.
+    try (DBHandle db = BlockUtils.getDB(containerData, conf);
+         BatchOperation op = 
db.getStore().getBatchHandler().initBatchOperation()) {
+      for (int i = 0; i < blockDataList.size(); i += 2) {
+        BlockData blockData = blockDataList.get(i);
+        // Delete the block metadata from the container db
+        db.getStore().getBlockDataTable().deleteWithBatch(op, 
containerData.getBlockKey(blockData.getLocalID()));
+        // Delete the block file.
+        Files.deleteIfExists(Paths.get(chunksPath + "/" + 
blockData.getBlockID().getLocalID() + ".block"));
+      }
+      db.getStore().getBatchHandler().commitBatchOperation(op);
+      db.getStore().flushDB();
+    }
+
+    // TODO: Use On-demand container scanner to build the new container merkle 
tree. (HDDS-10374)
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(container);
+    ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+        readChecksumFile(container.getContainerData());
+    long dataChecksumAfterBlockDelete = 
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+    // Checksum should have changed after block delete.
+    Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+    // 3. Reconcile the container.
+    
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+        return 
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum() == 
oldDataChecksum;
+      } catch (Exception ex) {
+        return false;
+      }
+    }, 500, 20000);
+    ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+        newContainerChecksumInfo.getContainerMerkleTree());
+    List<HddsProtos.SCMContainerReplicaProto> containerReplicas = 
cluster.getStorageContainerManager()
+        .getClientProtocolServer().getContainerReplicas(containerID, 
ClientVersion.CURRENT_VERSION);

Review Comment:
   We can save the client from line 387 and use it again here to call 
`getContainerReplicas` instead of this lower level call.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws 
Exception {
     }
   }
 
-  private long writeDataAndGetContainer(boolean close) throws Exception {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
+  @Test
+  public void testContainerChecksumWithBlockMissing() throws Exception {
+    // 1. Write data to a container.
+    // Read the key back check it's hash.
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024 
* 1024, volume, bucket);
+    long containerID = containerAndData.getLeft();
+    byte[] data = containerAndData.getRight();
+    // Get the datanodes where the container replicas are stored.
+    List<DatanodeDetails> dataNodeDetails = 
cluster.getStorageContainerManager().getContainerManager()
+        .getContainerReplicas(ContainerID.valueOf(containerID))
+        .stream().map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
+    Assertions.assertEquals(3, dataNodeDetails.size());
+    HddsDatanodeService hddsDatanodeService = 
cluster.getHddsDatanode(dataNodeDetails.get(0));
+    DatanodeStateMachine datanodeStateMachine = 
hddsDatanodeService.getDatanodeStateMachine();
+    Container<?> container = 
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    KeyValueHandler kvHandler = (KeyValueHandler) 
datanodeStateMachine.getContainer().getDispatcher()
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+    BlockManager blockManager = kvHandler.getBlockManager();
+    List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+    String chunksPath = container.getContainerData().getChunksPath();
+    long oldDataChecksum = 
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    // 2. Delete some blocks to simulate missing blocks.
+    try (DBHandle db = BlockUtils.getDB(containerData, conf);
+         BatchOperation op = 
db.getStore().getBatchHandler().initBatchOperation()) {
+      for (int i = 0; i < blockDataList.size(); i += 2) {
+        BlockData blockData = blockDataList.get(i);
+        // Delete the block metadata from the container db
+        db.getStore().getBlockDataTable().deleteWithBatch(op, 
containerData.getBlockKey(blockData.getLocalID()));
+        // Delete the block file.
+        Files.deleteIfExists(Paths.get(chunksPath + "/" + 
blockData.getBlockID().getLocalID() + ".block"));
+      }
+      db.getStore().getBatchHandler().commitBatchOperation(op);
+      db.getStore().flushDB();
+    }
+
+    // TODO: Use On-demand container scanner to build the new container merkle 
tree. (HDDS-10374)
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(container);
+    ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+        readChecksumFile(container.getContainerData());
+    long dataChecksumAfterBlockDelete = 
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+    // Checksum should have changed after block delete.
+    Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+    // 3. Reconcile the container.
+    
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+        return 
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum() == 
oldDataChecksum;
+      } catch (Exception ex) {
+        return false;
+      }
+    }, 500, 20000);
+    ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+        newContainerChecksumInfo.getContainerMerkleTree());
+    List<HddsProtos.SCMContainerReplicaProto> containerReplicas = 
cluster.getStorageContainerManager()
+        .getClientProtocolServer().getContainerReplicas(containerID, 
ClientVersion.CURRENT_VERSION);

Review Comment:
   Same in the next test



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -73,24 +125,36 @@ public class TestContainerCommandReconciliation {
   private static ObjectStore store;
   private static OzoneConfiguration conf;
   private static DNContainerOperationClient dnClient;
+  private static final String KEY_NAME = "testkey";
 
   @TempDir
   private static File testDir;
+  @TempDir
+  private static File workDir;
+  private static MiniKdc miniKdc;
+  private static File ozoneKeytab;
+  private static File spnegoKeytab;
+  private static File testUserKeytab;
+  private static String testUserPrincipal;
+  private static String host;
 
   @BeforeAll
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
-    conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
+    conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
     conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 128 * 1024, 
StorageUnit.BYTES);
+    conf.setStorageSize(OZONE_SCM_BLOCK_SIZE,  512 * 1024, StorageUnit.BYTES);
     // Disable the container scanner so it does not create merkle tree files 
that interfere with this test.
     conf.getObject(ContainerScannerConfiguration.class).setEnabled(false);
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(3)
-        .build();
-    cluster.waitForClusterToBeReady();
-    rpcClient = OzoneClientFactory.getRpcClient(conf);
-    store = rpcClient.getObjectStore();
-    dnClient = new DNContainerOperationClient(conf, null, null);
+
+    ExitUtils.disableSystemExit();

Review Comment:
   Why is this here?



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws 
Exception {
     }
   }
 
-  private long writeDataAndGetContainer(boolean close) throws Exception {
-    String volumeName = UUID.randomUUID().toString();
-    String bucketName = UUID.randomUUID().toString();
+  @Test
+  public void testContainerChecksumWithBlockMissing() throws Exception {
+    // 1. Write data to a container.
+    // Read the key back check it's hash.
+    String volume = UUID.randomUUID().toString();
+    String bucket = UUID.randomUUID().toString();
+    Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024 
* 1024, volume, bucket);
+    long containerID = containerAndData.getLeft();
+    byte[] data = containerAndData.getRight();
+    // Get the datanodes where the container replicas are stored.
+    List<DatanodeDetails> dataNodeDetails = 
cluster.getStorageContainerManager().getContainerManager()
+        .getContainerReplicas(ContainerID.valueOf(containerID))
+        .stream().map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
+    Assertions.assertEquals(3, dataNodeDetails.size());
+    HddsDatanodeService hddsDatanodeService = 
cluster.getHddsDatanode(dataNodeDetails.get(0));
+    DatanodeStateMachine datanodeStateMachine = 
hddsDatanodeService.getDatanodeStateMachine();
+    Container<?> container = 
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    KeyValueHandler kvHandler = (KeyValueHandler) 
datanodeStateMachine.getContainer().getDispatcher()
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+    BlockManager blockManager = kvHandler.getBlockManager();
+    List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+    String chunksPath = container.getContainerData().getChunksPath();
+    long oldDataChecksum = 
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    // 2. Delete some blocks to simulate missing blocks.
+    try (DBHandle db = BlockUtils.getDB(containerData, conf);
+         BatchOperation op = 
db.getStore().getBatchHandler().initBatchOperation()) {
+      for (int i = 0; i < blockDataList.size(); i += 2) {
+        BlockData blockData = blockDataList.get(i);
+        // Delete the block metadata from the container db
+        db.getStore().getBlockDataTable().deleteWithBatch(op, 
containerData.getBlockKey(blockData.getLocalID()));
+        // Delete the block file.
+        Files.deleteIfExists(Paths.get(chunksPath + "/" + 
blockData.getBlockID().getLocalID() + ".block"));
+      }
+      db.getStore().getBatchHandler().commitBatchOperation(op);
+      db.getStore().flushDB();
+    }
+
+    // TODO: Use On-demand container scanner to build the new container merkle 
tree. (HDDS-10374)
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(container);
+    ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+        readChecksumFile(container.getContainerData());
+    long dataChecksumAfterBlockDelete = 
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+    // Checksum should have changed after block delete.
+    Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+    // 3. Reconcile the container.
+    
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+        return 
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum() == 
oldDataChecksum;

Review Comment:
   Same in the next test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to