danny0405 commented on a change in pull request #4880:
URL: https://github.com/apache/hudi/pull/4880#discussion_r840329491



##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1097,6 +1097,123 @@ public void 
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
+                                                        boolean 
isCompressionEnabled,
+                                                        boolean 
readBlocksLazily)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Write 1
+    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, 
records1, header);
+    writer.appendBlock(dataBlock);
+
+    // Write 2
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    List<IndexedRecord> copyOfRecords2 = records2.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
+    writer.appendBlock(dataBlock);
+
+    copyOfRecords1.addAll(copyOfRecords2);
+    List<String> originalKeys =
+        copyOfRecords1.stream().map(s -> ((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+            .collect(Collectors.toList());
+
+    // Delete 10 keys
+    // Default orderingVal is 0, which means natural order, the DELETE records
+    // should overwrite the data records.
+    List<DeleteRecord> deleteRecords1 = copyOfRecords1.subList(0, 10).stream()
+        .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+    HoodieDeleteBlock deleteBlock1 = new 
HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header);
+    writer.appendBlock(deleteBlock1);
+
+    // Delete another 10 keys with -1 as orderingVal.
+    // The deletion should not work
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+    HoodieDeleteBlock deleteBlock2 = new 
HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
+        .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 
-1))).toArray(DeleteRecord[]::new), header);
+    writer.appendBlock(deleteBlock2);
+
+    // Delete another 10 keys with +1 as orderingVal.
+    // The deletion should work because the keys has greater ordering value.
+    List<DeleteRecord> deletedRecords3 = copyOfRecords1.subList(20, 
30).stream()
+        .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1)))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
+    HoodieDeleteBlock deleteBlock3 = new 
HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header);
+    writer.appendBlock(deleteBlock3);
+

Review comment:
       It is not that easy to construct update with same keys for sampling 
data, `testPreCombineFiledForReadMOR` already cover this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to