This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 99dd1cb6e6 [HUDI-3835] Add UT for delete in java client (#5270) 99dd1cb6e6 is described below commit 99dd1cb6e63600681aa11b3a03bc16d1401d8055 Author: 董可伦 <dongkelu...@inspur.com> AuthorDate: Sat Apr 16 03:03:48 2022 +0800 [HUDI-3835] Add UT for delete in java client (#5270) --- .../commit/TestJavaCopyOnWriteActionExecutor.java | 86 +++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java index 1bf1b4cccb..518414d614 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -318,7 +318,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase } @Test - public void testInsertRecords() throws Exception { + public void testInsertRecords() throws Exception { HoodieWriteConfig config = makeHoodieClientConfig(); String instantTime = makeNewCommitTime(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -465,6 +465,90 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestBase verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords)); } + @Test + public void testDeleteRecords() throws Exception { + // Prepare the AvroParquetIO + HoodieWriteConfig config = makeHoodieClientConfig(); + int startInstant = 1; + String firstCommitTime = makeNewCommitTime(startInstant++, "%09d"); + HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); + writeClient.startCommitWithTime(firstCommitTime); + metaClient = HoodieTableMetaClient.reload(metaClient); + BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + + String partitionPath = "2022/04/09"; + + // Get some records belong to the same partition (2016/01/31) + String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2022-04-09T03:16:41.415Z\",\"number\":1}"; + String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2022-04-09T03:20:41.415Z\",\"number\":2}"; + String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2022-04-09T03:16:41.415Z\",\"number\":3}"; + + List<HoodieRecord> records = new ArrayList<>(); + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + records.add(new HoodieAvroRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1)); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + records.add(new HoodieAvroRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2)); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + records.add(new HoodieAvroRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3)); + + // Insert new records + writeClient.insert(records, firstCommitTime); + + FileStatus[] allFiles = getIncrementalFiles(partitionPath, "0", -1); + assertEquals(1, allFiles.length); + + // Read out the bloom filter and make sure filter can answer record exist or not + Path filePath = allFiles[0].getPath(); + BloomFilter filter = fileUtils.readBloomFilterFromMetadata(hadoopConf, filePath); + for (HoodieRecord record : records) { + assertTrue(filter.mightContain(record.getRecordKey())); + } + + // Read the base file, check the record content + List<GenericRecord> fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath); + int index = 0; + for (GenericRecord record : fileRecords) { + assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString()); + index++; + } + + String newCommitTime = makeNewCommitTime(startInstant++, "%09d"); + writeClient.startCommitWithTime(newCommitTime); + + // Test delete two records + List<HoodieKey> keysForDelete = new ArrayList(Arrays.asList(records.get(0).getKey(), records.get(2).getKey())); + writeClient.delete(keysForDelete, newCommitTime); + + allFiles = getIncrementalFiles(partitionPath, "0", -1); + assertEquals(1, allFiles.length); + + filePath = allFiles[0].getPath(); + // Read the base file, check the record content + fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath); + // Check that the two records are deleted successfully + assertEquals(1, fileRecords.size()); + assertEquals(records.get(1).getRecordKey(), fileRecords.get(0).get("_row_key").toString()); + + newCommitTime = makeNewCommitTime(startInstant++, "%09d"); + writeClient.startCommitWithTime(newCommitTime); + + // Test delete last record + keysForDelete = new ArrayList(Arrays.asList(records.get(1).getKey())); + writeClient.delete(keysForDelete, newCommitTime); + + allFiles = getIncrementalFiles(partitionPath, "0", -1); + assertEquals(1, allFiles.length); + + filePath = allFiles[0].getPath(); + // Read the base file, check the record content + fileRecords = fileUtils.readAvroRecords(hadoopConf, filePath); + // Check whether all records have been deleted + assertEquals(0, fileRecords.size()); + } + public static Map<String, Long> generateExpectedPartitionNumRecords(List<HoodieRecord> records) { return records.stream().map(record -> Pair.of(record.getPartitionPath(), 1)) .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));