nsivabalan commented on code in PR #13010:
URL: https://github.com/apache/hudi/pull/13010#discussion_r2013058804


##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -288,13 +301,21 @@ private FileSlice 
getFileSliceToRead(StorageConfiguration<?> storageConf,
     return fileSlice;
   }
 
-  private List<T> readRecordsFromFileGroup(StorageConfiguration<?> storageConf,
-                                           String tablePath,
-                                           HoodieTableMetaClient metaClient,
-                                           FileSlice fileSlice,
-                                           Schema avroSchema,
-                                           RecordMergeMode recordMergeMode,
-                                           boolean isSkipMerge) throws 
Exception {
+  private FileSlice 
getFileSliceToReadIncludingInflight(StorageConfiguration<?> storageConf, String 
tablePath,
+                                                        HoodieTableMetaClient 
metaClient, String[] partitionPaths,
+                                                        boolean 
containsBaseFile, int expectedLogFileNum) {
+    HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(storageConf);
+    HoodieTableFileSystemView fsView = 
HoodieTableFileSystemView.fileListingBasedFileSystemView(engineContext, 
metaClient, metaClient.getActiveTimeline(), false);

Review Comment:
   where are we closing the FSV ?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala:
##########
@@ -176,4 +184,25 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     assertEquals(expectedOrderingValue,
       metadataMap.get(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD))
   }
+
+  @ParameterizedTest
+  @EnumSource(classOf[RecordMergeMode])
+  @throws[Exception]
+  def testReadFileGroupInflightData(recordMergeMode: RecordMergeMode): Unit = {
+    val writeConfigs = new util.HashMap[String, 
String](getCommonConfigs(recordMergeMode))
+    writeConfigs.put(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+    try {
+      val dataGen = new HoodieTestDataGenerator(0xDEEF)
+      try {
+        // One commit; reading one file group containing a base file only
+        commitToTable(dataGen.generateInserts("001", 100), INSERT.value, 
writeConfigs)
+        validateOutputFromFileGroupReader(getStorageConf, getBasePath, 
dataGen.getPartitionPaths, true, 0, recordMergeMode)
+
+        commitToTable(dataGen.generateUniqueUpdates("003", 100), UPSERT.value, 
writeConfigs)
+        val metaClient = HoodieTestUtils.createMetaClient(getStorageConf, 
getBasePath)
+        metaClient.getStorage.deleteFile(new 
StoragePath(metaClient.getTimelinePath, new 
DefaultInstantFileNameGenerator().getFileName(metaClient.getActiveTimeline.lastInstant().get())))
+        validateOutputFromFileGroupReaderIncludingInflight(getStorageConf, 
getBasePath, dataGen.getPartitionPaths, true, 1, recordMergeMode, true)

Review Comment:
   I would expect, we have a log file from a concurrent writer or something and 
if we initialize the FG reader by setting "allowInflightInstants = true". FG 
reader should return the records from inflight log file as well. 
   
   we don't need to add functional test for this. 
   just write a test directly against a FG reader (one file group essentially). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to