errose28 commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r2035612223
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
Assertions.assertEquals(0, icrCount.get());
// This should trigger container report validation in the ICR handler
above.
- keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class),
container, Collections.emptySet());
+ DNContainerOperationClient mockDnClient =
mock(DNContainerOperationClient.class);
+ DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+ when(mockDnClient.getContainerChecksumInfo(anyLong(),
any())).thenReturn(null);
+ keyValueHandler.reconcileContainer(mockDnClient, container,
Sets.newHashSet(peer1, peer2, peer3));
+ // Make sure all the replicas are used for reconciliation.
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
Assertions.assertEquals(1, icrCount.get());
}
+ @ParameterizedTest
+ @MethodSource("corruptionValues")
+ public void testFullContainerReconciliation(int numBlocks, int numChunks)
throws Exception {
+ KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+ ContainerChecksumTreeManager checksumManager =
kvHandler.getChecksumManager();
+ DNContainerOperationClient dnClient = new DNContainerOperationClient(conf,
null, null);
+ final long containerID = 100L;
+ // Create 3 containers with 15 blocks each and 3 replicas.
+ List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler,
containerID, 15, 3);
+ assertEquals(3, containers.size());
+
+ // Introduce corruption in each container on different replicas.
+ introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks,
false);
+ introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks,
true);
+
+ // Without reconciliation, checksums should be different because of the
corruption.
+ Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+ for (KeyValueContainer kvContainer : containers) {
+ kvHandler.createContainerMerkleTree(kvContainer);
+ Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+ checksumManager.read(kvContainer.getContainerData());
+ assertTrue(containerChecksumInfo.isPresent());
+
checksumsBeforeReconciliation.add(containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum());
+ }
+ // There should be more than 1 checksum because of the corruption.
+ assertTrue(checksumsBeforeReconciliation.size() > 1);
+
+ List<DatanodeDetails> datanodes =
ImmutableList.of(randomDatanodeDetails(), randomDatanodeDetails(),
+ randomDatanodeDetails());
+ Map<String, KeyValueContainer> dnToContainerMap = new HashMap<>();
+ dnToContainerMap.put(datanodes.get(0).getUuidString(), containers.get(0));
+ dnToContainerMap.put(datanodes.get(1).getUuidString(), containers.get(1));
+ dnToContainerMap.put(datanodes.get(2).getUuidString(), containers.get(2));
+
+ // Setup mock for each datanode network calls needed for reconciliation.
+ try (MockedStatic<ContainerProtocolCalls> containerProtocolMock =
+ Mockito.mockStatic(ContainerProtocolCalls.class)) {
+ // Mock getContainerChecksumInfo
+ containerProtocolMock.when(() ->
ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
+ .thenAnswer(inv -> {
+ XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+ DatanodeDetails dn =
xceiverClientSpi.getPipeline().getClosestNode();
Review Comment:
We expect the client to only use single node pipelines to reconcile with one
peer at a time.
```suggestion
Pipeline pipeline = xceiverClientSpi.getPipeline();
assertEquals(1, pipeline.size())
DatanodeDetails dn = pipeline.getFirstNode();
```
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
Assertions.assertEquals(0, icrCount.get());
// This should trigger container report validation in the ICR handler
above.
- keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class),
container, Collections.emptySet());
+ DNContainerOperationClient mockDnClient =
mock(DNContainerOperationClient.class);
+ DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+ when(mockDnClient.getContainerChecksumInfo(anyLong(),
any())).thenReturn(null);
+ keyValueHandler.reconcileContainer(mockDnClient, container,
Sets.newHashSet(peer1, peer2, peer3));
+ // Make sure all the replicas are used for reconciliation.
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
Assertions.assertEquals(1, icrCount.get());
}
+ @ParameterizedTest
+ @MethodSource("corruptionValues")
+ public void testFullContainerReconciliation(int numBlocks, int numChunks)
throws Exception {
+ KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+ ContainerChecksumTreeManager checksumManager =
kvHandler.getChecksumManager();
+ DNContainerOperationClient dnClient = new DNContainerOperationClient(conf,
null, null);
+ final long containerID = 100L;
+ // Create 3 containers with 15 blocks each and 3 replicas.
+ List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler,
containerID, 15, 3);
+ assertEquals(3, containers.size());
+
+ // Introduce corruption in each container on different replicas.
+ introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks,
false);
+ introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks,
true);
+
+ // Without reconciliation, checksums should be different because of the
corruption.
+ Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+ for (KeyValueContainer kvContainer : containers) {
+ kvHandler.createContainerMerkleTree(kvContainer);
+ Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+ checksumManager.read(kvContainer.getContainerData());
+ assertTrue(containerChecksumInfo.isPresent());
+
checksumsBeforeReconciliation.add(containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum());
Review Comment:
We should also check that a matching data checksum was added to the
ContainerData.
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
Assertions.assertEquals(0, icrCount.get());
// This should trigger container report validation in the ICR handler
above.
- keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class),
container, Collections.emptySet());
+ DNContainerOperationClient mockDnClient =
mock(DNContainerOperationClient.class);
+ DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+ when(mockDnClient.getContainerChecksumInfo(anyLong(),
any())).thenReturn(null);
+ keyValueHandler.reconcileContainer(mockDnClient, container,
Sets.newHashSet(peer1, peer2, peer3));
+ // Make sure all the replicas are used for reconciliation.
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
Assertions.assertEquals(1, icrCount.get());
}
+ @ParameterizedTest
+ @MethodSource("corruptionValues")
+ public void testFullContainerReconciliation(int numBlocks, int numChunks)
throws Exception {
+ KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+ ContainerChecksumTreeManager checksumManager =
kvHandler.getChecksumManager();
+ DNContainerOperationClient dnClient = new DNContainerOperationClient(conf,
null, null);
+ final long containerID = 100L;
+ // Create 3 containers with 15 blocks each and 3 replicas.
+ List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler,
containerID, 15, 3);
+ assertEquals(3, containers.size());
+
+ // Introduce corruption in each container on different replicas.
+ introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks,
false);
+ introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks,
true);
+
+ // Without reconciliation, checksums should be different because of the
corruption.
+ Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+ for (KeyValueContainer kvContainer : containers) {
+ kvHandler.createContainerMerkleTree(kvContainer);
+ Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+ checksumManager.read(kvContainer.getContainerData());
+ assertTrue(containerChecksumInfo.isPresent());
+
checksumsBeforeReconciliation.add(containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum());
Review Comment:
Same at the end of the test.
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
Assertions.assertEquals(0, icrCount.get());
// This should trigger container report validation in the ICR handler
above.
- keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class),
container, Collections.emptySet());
+ DNContainerOperationClient mockDnClient =
mock(DNContainerOperationClient.class);
+ DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+ when(mockDnClient.getContainerChecksumInfo(anyLong(),
any())).thenReturn(null);
+ keyValueHandler.reconcileContainer(mockDnClient, container,
Sets.newHashSet(peer1, peer2, peer3));
+ // Make sure all the replicas are used for reconciliation.
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
Assertions.assertEquals(1, icrCount.get());
}
+ @ParameterizedTest
+ @MethodSource("corruptionValues")
+ public void testFullContainerReconciliation(int numBlocks, int numChunks)
throws Exception {
+ KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+ ContainerChecksumTreeManager checksumManager =
kvHandler.getChecksumManager();
+ DNContainerOperationClient dnClient = new DNContainerOperationClient(conf,
null, null);
+ final long containerID = 100L;
+ // Create 3 containers with 15 blocks each and 3 replicas.
+ List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler,
containerID, 15, 3);
+ assertEquals(3, containers.size());
+
+ // Introduce corruption in each container on different replicas.
+ introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks,
false);
+ introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks,
true);
+
+ // Without reconciliation, checksums should be different because of the
corruption.
+ Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+ for (KeyValueContainer kvContainer : containers) {
+ kvHandler.createContainerMerkleTree(kvContainer);
+ Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo =
+ checksumManager.read(kvContainer.getContainerData());
+ assertTrue(containerChecksumInfo.isPresent());
+
checksumsBeforeReconciliation.add(containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum());
+ }
+ // There should be more than 1 checksum because of the corruption.
+ assertTrue(checksumsBeforeReconciliation.size() > 1);
+
+ List<DatanodeDetails> datanodes =
ImmutableList.of(randomDatanodeDetails(), randomDatanodeDetails(),
+ randomDatanodeDetails());
+ Map<String, KeyValueContainer> dnToContainerMap = new HashMap<>();
+ dnToContainerMap.put(datanodes.get(0).getUuidString(), containers.get(0));
+ dnToContainerMap.put(datanodes.get(1).getUuidString(), containers.get(1));
+ dnToContainerMap.put(datanodes.get(2).getUuidString(), containers.get(2));
+
+ // Setup mock for each datanode network calls needed for reconciliation.
+ try (MockedStatic<ContainerProtocolCalls> containerProtocolMock =
+ Mockito.mockStatic(ContainerProtocolCalls.class)) {
+ // Mock getContainerChecksumInfo
+ containerProtocolMock.when(() ->
ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
+ .thenAnswer(inv -> {
+ XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+ DatanodeDetails dn =
xceiverClientSpi.getPipeline().getClosestNode();
Review Comment:
Same in the other mocks. I would also move the construction of the container
protocol mock to a helper method for readability.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws
Exception {
}
}
- private long writeDataAndGetContainer(boolean close) throws Exception {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
+ @Test
+ public void testContainerChecksumWithBlockMissing() throws Exception {
+ // 1. Write data to a container.
+ // Read the key back check it's hash.
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024
* 1024, volume, bucket);
+ long containerID = containerAndData.getLeft();
+ byte[] data = containerAndData.getRight();
+ // Get the datanodes where the container replicas are stored.
+ List<DatanodeDetails> dataNodeDetails =
cluster.getStorageContainerManager().getContainerManager()
+ .getContainerReplicas(ContainerID.valueOf(containerID))
+ .stream().map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(3, dataNodeDetails.size());
+ HddsDatanodeService hddsDatanodeService =
cluster.getHddsDatanode(dataNodeDetails.get(0));
+ DatanodeStateMachine datanodeStateMachine =
hddsDatanodeService.getDatanodeStateMachine();
+ Container<?> container =
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ KeyValueHandler kvHandler = (KeyValueHandler)
datanodeStateMachine.getContainer().getDispatcher()
+ .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+ BlockManager blockManager = kvHandler.getBlockManager();
+ List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+ String chunksPath = container.getContainerData().getChunksPath();
+ long oldDataChecksum =
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ // 2. Delete some blocks to simulate missing blocks.
+ try (DBHandle db = BlockUtils.getDB(containerData, conf);
+ BatchOperation op =
db.getStore().getBatchHandler().initBatchOperation()) {
+ for (int i = 0; i < blockDataList.size(); i += 2) {
+ BlockData blockData = blockDataList.get(i);
+ // Delete the block metadata from the container db
+ db.getStore().getBlockDataTable().deleteWithBatch(op,
containerData.getBlockKey(blockData.getLocalID()));
+ // Delete the block file.
+ Files.deleteIfExists(Paths.get(chunksPath + "/" +
blockData.getBlockID().getLocalID() + ".block"));
+ }
+ db.getStore().getBatchHandler().commitBatchOperation(op);
+ db.getStore().flushDB();
+ }
+
+ // TODO: Use On-demand container scanner to build the new container merkle
tree. (HDDS-10374)
+
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+ kvHandler.createContainerMerkleTree(container);
+ ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+ readChecksumFile(container.getContainerData());
+ long dataChecksumAfterBlockDelete =
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+ // Checksum should have changed after block delete.
+ Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+ // 3. Reconcile the container.
+
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ return
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum() ==
oldDataChecksum;
Review Comment:
The following section is checking SCM state, but the heartbeat may not have
reached SCM yet. I think this section needs to wait until SCM gets the updated
info.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws
Exception {
}
}
- private long writeDataAndGetContainer(boolean close) throws Exception {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
+ @Test
+ public void testContainerChecksumWithBlockMissing() throws Exception {
+ // 1. Write data to a container.
+ // Read the key back check it's hash.
Review Comment:
```suggestion
// Read the key back and check its hash.
```
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -499,10 +572,125 @@ public void
testReconcileContainer(ContainerLayoutVersion layoutVersion) throws
Assertions.assertEquals(0, icrCount.get());
// This should trigger container report validation in the ICR handler
above.
- keyValueHandler.reconcileContainer(mock(DNContainerOperationClient.class),
container, Collections.emptySet());
+ DNContainerOperationClient mockDnClient =
mock(DNContainerOperationClient.class);
+ DatanodeDetails peer1 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer2 = MockDatanodeDetails.randomDatanodeDetails();
+ DatanodeDetails peer3 = MockDatanodeDetails.randomDatanodeDetails();
+ when(mockDnClient.getContainerChecksumInfo(anyLong(),
any())).thenReturn(null);
+ keyValueHandler.reconcileContainer(mockDnClient, container,
Sets.newHashSet(peer1, peer2, peer3));
+ // Make sure all the replicas are used for reconciliation.
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer1));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer2));
+ Mockito.verify(mockDnClient,
atMostOnce()).getContainerChecksumInfo(anyLong(), eq(peer3));
Assertions.assertEquals(1, icrCount.get());
}
+ @ParameterizedTest
+ @MethodSource("corruptionValues")
+ public void testFullContainerReconciliation(int numBlocks, int numChunks)
throws Exception {
+ KeyValueHandler kvHandler = createKeyValueHandler(tempDir);
+ ContainerChecksumTreeManager checksumManager =
kvHandler.getChecksumManager();
+ DNContainerOperationClient dnClient = new DNContainerOperationClient(conf,
null, null);
+ final long containerID = 100L;
+ // Create 3 containers with 15 blocks each and 3 replicas.
+ List<KeyValueContainer> containers = createContainerWithBlocks(kvHandler,
containerID, 15, 3);
+ assertEquals(3, containers.size());
+
+ // Introduce corruption in each container on different replicas.
+ introduceCorruption(kvHandler, containers.get(1), numBlocks, numChunks,
false);
+ introduceCorruption(kvHandler, containers.get(2), numBlocks, numChunks,
true);
+
+ // Without reconciliation, checksums should be different because of the
corruption.
+ Set<Long> checksumsBeforeReconciliation = new HashSet<>();
+ for (KeyValueContainer kvContainer : containers) {
+ kvHandler.createContainerMerkleTree(kvContainer);
Review Comment:
Building the tree from metadata is supposed to happen automatically in
reconciliation if the tree does not exist, and again at the end with the
updated values. I think we should avoid calling this in cases where we invoke
reconciliation directly.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws
Exception {
}
}
- private long writeDataAndGetContainer(boolean close) throws Exception {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
+ @Test
+ public void testContainerChecksumWithBlockMissing() throws Exception {
+ // 1. Write data to a container.
+ // Read the key back check it's hash.
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024
* 1024, volume, bucket);
+ long containerID = containerAndData.getLeft();
+ byte[] data = containerAndData.getRight();
+ // Get the datanodes where the container replicas are stored.
+ List<DatanodeDetails> dataNodeDetails =
cluster.getStorageContainerManager().getContainerManager()
+ .getContainerReplicas(ContainerID.valueOf(containerID))
+ .stream().map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(3, dataNodeDetails.size());
+ HddsDatanodeService hddsDatanodeService =
cluster.getHddsDatanode(dataNodeDetails.get(0));
+ DatanodeStateMachine datanodeStateMachine =
hddsDatanodeService.getDatanodeStateMachine();
+ Container<?> container =
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ KeyValueHandler kvHandler = (KeyValueHandler)
datanodeStateMachine.getContainer().getDispatcher()
+ .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+ BlockManager blockManager = kvHandler.getBlockManager();
+ List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+ String chunksPath = container.getContainerData().getChunksPath();
+ long oldDataChecksum =
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ // 2. Delete some blocks to simulate missing blocks.
+ try (DBHandle db = BlockUtils.getDB(containerData, conf);
+ BatchOperation op =
db.getStore().getBatchHandler().initBatchOperation()) {
+ for (int i = 0; i < blockDataList.size(); i += 2) {
+ BlockData blockData = blockDataList.get(i);
+ // Delete the block metadata from the container db
+ db.getStore().getBlockDataTable().deleteWithBatch(op,
containerData.getBlockKey(blockData.getLocalID()));
+ // Delete the block file.
+ Files.deleteIfExists(Paths.get(chunksPath + "/" +
blockData.getBlockID().getLocalID() + ".block"));
+ }
+ db.getStore().getBatchHandler().commitBatchOperation(op);
+ db.getStore().flushDB();
+ }
+
+ // TODO: Use On-demand container scanner to build the new container merkle
tree. (HDDS-10374)
+
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+ kvHandler.createContainerMerkleTree(container);
+ ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+ readChecksumFile(container.getContainerData());
+ long dataChecksumAfterBlockDelete =
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+ // Checksum should have changed after block delete.
+ Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+ // 3. Reconcile the container.
+
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ return
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum() ==
oldDataChecksum;
+ } catch (Exception ex) {
+ return false;
+ }
+ }, 500, 20000);
+ ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+ newContainerChecksumInfo.getContainerMerkleTree());
+ List<HddsProtos.SCMContainerReplicaProto> containerReplicas =
cluster.getStorageContainerManager()
+ .getClientProtocolServer().getContainerReplicas(containerID,
ClientVersion.CURRENT_VERSION);
Review Comment:
We can save the client from line 387 and use it again here to call
`getContainerReplicas` instead of this lower level call.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws
Exception {
}
}
- private long writeDataAndGetContainer(boolean close) throws Exception {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
+ @Test
+ public void testContainerChecksumWithBlockMissing() throws Exception {
+ // 1. Write data to a container.
+ // Read the key back check it's hash.
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024
* 1024, volume, bucket);
+ long containerID = containerAndData.getLeft();
+ byte[] data = containerAndData.getRight();
+ // Get the datanodes where the container replicas are stored.
+ List<DatanodeDetails> dataNodeDetails =
cluster.getStorageContainerManager().getContainerManager()
+ .getContainerReplicas(ContainerID.valueOf(containerID))
+ .stream().map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(3, dataNodeDetails.size());
+ HddsDatanodeService hddsDatanodeService =
cluster.getHddsDatanode(dataNodeDetails.get(0));
+ DatanodeStateMachine datanodeStateMachine =
hddsDatanodeService.getDatanodeStateMachine();
+ Container<?> container =
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ KeyValueHandler kvHandler = (KeyValueHandler)
datanodeStateMachine.getContainer().getDispatcher()
+ .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+ BlockManager blockManager = kvHandler.getBlockManager();
+ List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+ String chunksPath = container.getContainerData().getChunksPath();
+ long oldDataChecksum =
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ // 2. Delete some blocks to simulate missing blocks.
+ try (DBHandle db = BlockUtils.getDB(containerData, conf);
+ BatchOperation op =
db.getStore().getBatchHandler().initBatchOperation()) {
+ for (int i = 0; i < blockDataList.size(); i += 2) {
+ BlockData blockData = blockDataList.get(i);
+ // Delete the block metadata from the container db
+ db.getStore().getBlockDataTable().deleteWithBatch(op,
containerData.getBlockKey(blockData.getLocalID()));
+ // Delete the block file.
+ Files.deleteIfExists(Paths.get(chunksPath + "/" +
blockData.getBlockID().getLocalID() + ".block"));
+ }
+ db.getStore().getBatchHandler().commitBatchOperation(op);
+ db.getStore().flushDB();
+ }
+
+ // TODO: Use On-demand container scanner to build the new container merkle
tree. (HDDS-10374)
+
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+ kvHandler.createContainerMerkleTree(container);
+ ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+ readChecksumFile(container.getContainerData());
+ long dataChecksumAfterBlockDelete =
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+ // Checksum should have changed after block delete.
+ Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+ // 3. Reconcile the container.
+
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ return
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum() ==
oldDataChecksum;
+ } catch (Exception ex) {
+ return false;
+ }
+ }, 500, 20000);
+ ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+ newContainerChecksumInfo.getContainerMerkleTree());
+ List<HddsProtos.SCMContainerReplicaProto> containerReplicas =
cluster.getStorageContainerManager()
+ .getClientProtocolServer().getContainerReplicas(containerID,
ClientVersion.CURRENT_VERSION);
Review Comment:
Same in the next test
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -73,24 +125,36 @@ public class TestContainerCommandReconciliation {
private static ObjectStore store;
private static OzoneConfiguration conf;
private static DNContainerOperationClient dnClient;
+ private static final String KEY_NAME = "testkey";
@TempDir
private static File testDir;
+ @TempDir
+ private static File workDir;
+ private static MiniKdc miniKdc;
+ private static File ozoneKeytab;
+ private static File spnegoKeytab;
+ private static File testUserKeytab;
+ private static String testUserPrincipal;
+ private static String host;
@BeforeAll
public static void init() throws Exception {
conf = new OzoneConfiguration();
- conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
+ conf.set(OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ conf.setStorageSize(OZONE_SCM_CHUNK_SIZE_KEY, 128 * 1024,
StorageUnit.BYTES);
+ conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 512 * 1024, StorageUnit.BYTES);
// Disable the container scanner so it does not create merkle tree files
that interfere with this test.
conf.getObject(ContainerScannerConfiguration.class).setEnabled(false);
- cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(3)
- .build();
- cluster.waitForClusterToBeReady();
- rpcClient = OzoneClientFactory.getRpcClient(conf);
- store = rpcClient.getObjectStore();
- dnClient = new DNContainerOperationClient(conf, null, null);
+
+ ExitUtils.disableSystemExit();
Review Comment:
Why is this here?
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -247,26 +331,217 @@ public void testGetChecksumInfoSuccess() throws
Exception {
}
}
- private long writeDataAndGetContainer(boolean close) throws Exception {
- String volumeName = UUID.randomUUID().toString();
- String bucketName = UUID.randomUUID().toString();
+ @Test
+ public void testContainerChecksumWithBlockMissing() throws Exception {
+ // 1. Write data to a container.
+ // Read the key back check it's hash.
+ String volume = UUID.randomUUID().toString();
+ String bucket = UUID.randomUUID().toString();
+ Pair<Long, byte[]> containerAndData = getDataAndContainer(true, 20 * 1024
* 1024, volume, bucket);
+ long containerID = containerAndData.getLeft();
+ byte[] data = containerAndData.getRight();
+ // Get the datanodes where the container replicas are stored.
+ List<DatanodeDetails> dataNodeDetails =
cluster.getStorageContainerManager().getContainerManager()
+ .getContainerReplicas(ContainerID.valueOf(containerID))
+ .stream().map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+ Assertions.assertEquals(3, dataNodeDetails.size());
+ HddsDatanodeService hddsDatanodeService =
cluster.getHddsDatanode(dataNodeDetails.get(0));
+ DatanodeStateMachine datanodeStateMachine =
hddsDatanodeService.getDatanodeStateMachine();
+ Container<?> container =
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ KeyValueHandler kvHandler = (KeyValueHandler)
datanodeStateMachine.getContainer().getDispatcher()
+ .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+ BlockManager blockManager = kvHandler.getBlockManager();
+ List<BlockData> blockDataList = blockManager.listBlock(container, -1, 100);
+ String chunksPath = container.getContainerData().getChunksPath();
+ long oldDataChecksum =
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ // 2. Delete some blocks to simulate missing blocks.
+ try (DBHandle db = BlockUtils.getDB(containerData, conf);
+ BatchOperation op =
db.getStore().getBatchHandler().initBatchOperation()) {
+ for (int i = 0; i < blockDataList.size(); i += 2) {
+ BlockData blockData = blockDataList.get(i);
+ // Delete the block metadata from the container db
+ db.getStore().getBlockDataTable().deleteWithBatch(op,
containerData.getBlockKey(blockData.getLocalID()));
+ // Delete the block file.
+ Files.deleteIfExists(Paths.get(chunksPath + "/" +
blockData.getBlockID().getLocalID() + ".block"));
+ }
+ db.getStore().getBatchHandler().commitBatchOperation(op);
+ db.getStore().flushDB();
+ }
+
+ // TODO: Use On-demand container scanner to build the new container merkle
tree. (HDDS-10374)
+
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+ kvHandler.createContainerMerkleTree(container);
+ ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+ readChecksumFile(container.getContainerData());
+ long dataChecksumAfterBlockDelete =
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+ // Checksum should have changed after block delete.
+ Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+ // 3. Reconcile the container.
+
cluster.getStorageContainerLocationClient().reconcileContainer(containerID);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo =
readChecksumFile(container.getContainerData());
+ return
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum() ==
oldDataChecksum;
Review Comment:
Same in the next test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]