danny0405 commented on a change in pull request #4880: URL: https://github.com/apache/hudi/pull/4880#discussion_r840329491
########## File path: hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java ########## @@ -1097,6 +1097,123 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); } + @ParameterizedTest + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) + throws IOException, URISyntaxException, InterruptedException { + Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); + // Set a small threshold so that every block is a new version + Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build(); + + // Write 1 + List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List<IndexedRecord> copyOfRecords1 = records1.stream() + .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); + Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header); + writer.appendBlock(dataBlock); + + // Write 2 + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101"); + List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100); + List<IndexedRecord> copyOfRecords2 = records2.stream() + .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList()); + dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header); + writer.appendBlock(dataBlock); + + copyOfRecords1.addAll(copyOfRecords2); + List<String> originalKeys = + copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) + .collect(Collectors.toList()); + + // Delete 10 keys + // Default orderingVal is 0, which means natural order, the DELETE records + // should overwrite the data records. + List<DeleteRecord> deleteRecords1 = copyOfRecords1.subList(0, 10).stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) + .collect(Collectors.toList()); + + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102"); + HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header); + writer.appendBlock(deleteBlock1); + + // Delete another 10 keys with -1 as orderingVal. + // The deletion should not work + + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103"); + HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1))).toArray(DeleteRecord[]::new), header); + writer.appendBlock(deleteBlock2); + + // Delete another 10 keys with +1 as orderingVal. + // The deletion should work because the keys has greater ordering value. + List<DeleteRecord> deletedRecords3 = copyOfRecords1.subList(20, 30).stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1))) + .collect(Collectors.toList()); + + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104"); + HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header); + writer.appendBlock(deleteBlock3); + Review comment: It is not that easy to construct update with same keys for sampling data, `testPreCombineFiledForReadMOR` already cover this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org