nsivabalan commented on code in PR #13010: URL: https://github.com/apache/hudi/pull/13010#discussion_r2013058804
########## hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java: ########## @@ -288,13 +301,21 @@ private FileSlice getFileSliceToRead(StorageConfiguration<?> storageConf, return fileSlice; } - private List<T> readRecordsFromFileGroup(StorageConfiguration<?> storageConf, - String tablePath, - HoodieTableMetaClient metaClient, - FileSlice fileSlice, - Schema avroSchema, - RecordMergeMode recordMergeMode, - boolean isSkipMerge) throws Exception { + private FileSlice getFileSliceToReadIncludingInflight(StorageConfiguration<?> storageConf, String tablePath, + HoodieTableMetaClient metaClient, String[] partitionPaths, + boolean containsBaseFile, int expectedLogFileNum) { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(storageConf); + HoodieTableFileSystemView fsView = HoodieTableFileSystemView.fileListingBasedFileSystemView(engineContext, metaClient, metaClient.getActiveTimeline(), false); Review Comment: where are we closing the FSV ? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala: ########## @@ -176,4 +184,25 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int assertEquals(expectedOrderingValue, metadataMap.get(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD)) } + + @ParameterizedTest + @EnumSource(classOf[RecordMergeMode]) + @throws[Exception] + def testReadFileGroupInflightData(recordMergeMode: RecordMergeMode): Unit = { + val writeConfigs = new util.HashMap[String, String](getCommonConfigs(recordMergeMode)) + writeConfigs.put(DataSourceWriteOptions.TABLE_TYPE.key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + try { + val dataGen = new HoodieTestDataGenerator(0xDEEF) + try { + // One commit; reading one file group containing a base file only + commitToTable(dataGen.generateInserts("001", 100), INSERT.value, writeConfigs) + validateOutputFromFileGroupReader(getStorageConf, getBasePath, dataGen.getPartitionPaths, true, 0, recordMergeMode) + + commitToTable(dataGen.generateUniqueUpdates("003", 100), UPSERT.value, writeConfigs) + val metaClient = HoodieTestUtils.createMetaClient(getStorageConf, getBasePath) + metaClient.getStorage.deleteFile(new StoragePath(metaClient.getTimelinePath, new DefaultInstantFileNameGenerator().getFileName(metaClient.getActiveTimeline.lastInstant().get()))) + validateOutputFromFileGroupReaderIncludingInflight(getStorageConf, getBasePath, dataGen.getPartitionPaths, true, 1, recordMergeMode, true) Review Comment: I would expect, we have a log file from a concurrent writer or something and if we initialize the FG reader by setting "allowInflightInstants = true". FG reader should return the records from inflight log file as well. we don't need to add functional test for this. just write a test directly against a FG reader (one file group essentially). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org