nsivabalan commented on code in PR #13529:
URL: https://github.com/apache/hudi/pull/13529#discussion_r2220507093


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -307,6 +307,12 @@ object DefaultSource {
       val useNewParquetFileFormat = 
parameters.getOrElse(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
         
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean 
&&
         !metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty)
+      lazy val tableVersion = if 
(SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_TABLE_VERSION)) {
+        Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
+      } else {
+        metaClient.getTableConfig.getTableVersion.versionCode()
+      }
+      lazy val hudiOneTable = tableVersion >= 
HoodieTableVersion.EIGHT.versionCode()

Review Comment:
   can you add java docs to all out what exactly is `hudiOneTable`.lets not 
confused developers w/ One table or XTable :) 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -334,29 +340,13 @@ object DefaultSource {
               resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
             }
           case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
-            if (SparkConfigUtils.containsConfigProperty(parameters, 
INCREMENTAL_READ_TABLE_VERSION)) {
-              val writeTableVersion = 
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
-              if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()) 
{
-                if (useNewParquetFileFormat) {
-                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-                } else {
-                  new IncrementalRelationV2(sqlContext, parameters, 
userSchema, metaClient, RangeType.CLOSED_CLOSED)
-                }
-              } else {
-                new IncrementalRelationV1(sqlContext, parameters, userSchema, 
metaClient)
-              }
-            } else {
-              if (metaClient.getTableConfig.getTableVersion.versionCode() >= 
HoodieTableVersion.EIGHT.versionCode()) {
-                if (useNewParquetFileFormat) {
-                  new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
-                    sqlContext, metaClient, parameters, userSchema, 
isBootstrappedTable).build()
-                } else {
-                  new IncrementalRelationV2(sqlContext, parameters, 
userSchema, metaClient, RangeType.CLOSED_CLOSED)
-                }
-              } else {
-                new IncrementalRelationV1(sqlContext, parameters, userSchema, 
metaClient)
-              }
+            (hudiOneTable, useNewParquetFileFormat) match {
+              case (true, true) => new 
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(

Review Comment:
   do we really need 2 factories? 
   why can't we construct the right `IncrementalRelationV1` or 
`IncrementalRelationV2` here only and just use one 
`HoodieCopyOnWriteIncrementalHadoopFsRelationFactory` 
   
   just for this one abstraction which can be inferred and constructed here, 
why do we need to add more factory classes



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -424,15 +442,16 @@ abstract class 
BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override
 
 }
 
-class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
-                                                          override val 
metaClient: HoodieTableMetaClient,
-                                                          override val 
options: Map[String, String],
-                                                          override val 
schemaSpec: Option[StructType],
-                                                          isBootstrap: Boolean)
-  extends BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+abstract class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override 
val sqlContext: SQLContext,
+                                                                       
override val metaClient: HoodieTableMetaClient,
+                                                                       
override val options: Map[String, String],
+                                                                       
override val schemaSpec: Option[StructType],
+                                                                       
isBootstrap: Boolean,
+                                                                       
mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation)
+  extends HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
   private val incrementalFileIndex = new HoodieIncrementalFileIndex(
-    sparkSession, metaClient, schemaSpec, options, fileStatusCache, false, 
true)
+    sparkSession, metaClient, schemaSpec, options, fileStatusCache, false, 
true, mergeOnReadIncrementalRelation)

Review Comment:
   do you know why we are using `mergeOnReadIncrementalRelation` within 
HoodieIncrementalFileIndex even for COW table. 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -424,15 +442,16 @@ abstract class 
BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override
 
 }
 
-class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val 
sqlContext: SQLContext,
-                                                          override val 
metaClient: HoodieTableMetaClient,
-                                                          override val 
options: Map[String, String],
-                                                          override val 
schemaSpec: Option[StructType],
-                                                          isBootstrap: Boolean)
-  extends BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
+abstract class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override 
val sqlContext: SQLContext,
+                                                                       
override val metaClient: HoodieTableMetaClient,
+                                                                       
override val options: Map[String, String],
+                                                                       
override val schemaSpec: Option[StructType],
+                                                                       
isBootstrap: Boolean,
+                                                                       
mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation)
+  extends HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext, 
metaClient, options, schemaSpec, isBootstrap) {
 
   private val incrementalFileIndex = new HoodieIncrementalFileIndex(
-    sparkSession, metaClient, schemaSpec, options, fileStatusCache, false, 
true)
+    sparkSession, metaClient, schemaSpec, options, fileStatusCache, false, 
true, mergeOnReadIncrementalRelation)

Review Comment:
   shouldn't this be one of `IncrementalRelationV1` or `IncrementalRelationV2` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to