jonvex commented on code in PR #13527:
URL: https://github.com/apache/hudi/pull/13527#discussion_r2207939789
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -319,121 +301,164 @@ class
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
}
}
+class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val options:
Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap: Boolean)
+ extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options,
schemaSpec, isBootstrap) {
+ private val fileIndex: HoodieFileIndex = new HoodieFileIndex(
+ sparkSession,
+ metaClient,
+ Some(tableStructSchema),
+ optParams,
+ fileStatusCache,
+ includeLogFiles = true,
+ shouldEmbedFileSlices = true)
+
+ override def buildFileIndex(): FileIndex = fileIndex
+
+ override protected def isMOR: Boolean = true
+
+ override protected def isIncremental: Boolean = false
+
+ override protected def getRequiredFilters: Seq[Filter] = Seq.empty
+
+ override def buildPartitionSchema(): StructType = fileIndex.partitionSchema
+
+ override def buildDataSchema(): StructType = fileIndex.dataSchema
+}
+
+abstract class
BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+
override val metaClient: HoodieTableMetaClient,
+
override val options: Map[String, String],
+
override val schemaSpec: Option[StructType],
+
isBootstrap: Boolean)
+ extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options,
schemaSpec, isBootstrap) {
+
+ override protected def getMandatoryFields: Seq[String] =
Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead
+
+ override protected def isMOR: Boolean = true
+
+ override protected def isIncremental: Boolean = true
+}
+
class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
override val
metaClient: HoodieTableMetaClient,
override val
options: Map[String, String],
override val
schemaSpec: Option[StructType],
isBootstrap: Boolean)
- extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ extends BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
- override val mandatoryFields: Seq[String] =
Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead
+ private val incrementalFileIndex = new HoodieIncrementalFileIndex(
+ sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, true)
- override val fileIndex = new HoodieIncrementalFileIndex(
- sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), true, true)
+ override def buildFileIndex(): HoodieFileIndex = incrementalFileIndex
- override def buildFileFormat(): FileFormat = {
- if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
- new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
- true, true, fileIndex.getRequiredFilters)
- } else {
- new HoodieFileGroupReaderBasedParquetFileFormat(
- basePath.toString, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, queryTimestamp.get,
mandatoryFields,
- true, isBootstrap, true, fileIndex.isInstanceOf[HoodieCDCFileIndex],
- validCommits, shouldUseRecordPosition, fileIndex.getRequiredFilters)
- }
- }
+ override protected def getRequiredFilters: Seq[Filter] =
incrementalFileIndex.getRequiredFilters
+
+ override def buildPartitionSchema(): StructType =
incrementalFileIndex.partitionSchema
+
+ override def buildDataSchema(): StructType = incrementalFileIndex.dataSchema
+}
+
+class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext:
SQLContext,
+ override val metaClient:
HoodieTableMetaClient,
+ override val options:
Map[String, String],
+ override val schemaSpec:
Option[StructType],
+ isBootstrap: Boolean)
+ extends BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ private val hoodieCDCFileIndex = new HoodieCDCFileIndex(
+ sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, true)
+
+ override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
+
+ override def buildDataSchema(): StructType =
hoodieCDCFileIndex.cdcRelation.schema
+
+ override def buildPartitionSchema(): StructType = StructType(Nil)
+
+ override protected def getRequiredFilters: Seq[Filter] = Seq.empty
}
class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val
sqlContext: SQLContext,
override val
metaClient: HoodieTableMetaClient,
override val options:
Map[String, String],
override val
schemaSpec: Option[StructType],
isBootstrap: Boolean)
- extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
-
- override val mandatoryFields: Seq[String] = partitionColumnsToRead
+ extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options,
schemaSpec, isBootstrap) {
- override val fileIndex = new HoodieFileIndex(
+ val fileIndex: HoodieFileIndex = new HoodieFileIndex(
sparkSession,
metaClient,
Some(tableStructSchema),
optParams,
- FileStatusCache.getOrCreate(sparkSession),
+ fileStatusCache,
shouldEmbedFileSlices = true)
- override def buildFileFormat(): FileFormat = {
- if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled &&
!isBootstrap) {
- new
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
false, false, Seq.empty)
- } else {
- new HoodieFileGroupReaderBasedParquetFileFormat(
- basePath.toString, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
- metaClient.getTableConfig.getTableName, queryTimestamp.get,
mandatoryFields,
- false, isBootstrap, false, fileIndex.isInstanceOf[HoodieCDCFileIndex],
validCommits,
- shouldUseRecordPosition, Seq.empty)
- }
- }
+ override def buildFileIndex(): HoodieFileIndex = fileIndex
+
+ override protected def isMOR: Boolean = false
+
+ override protected def isIncremental: Boolean = false
+
+ override protected def getRequiredFilters: Seq[Filter] = Seq.empty
+
+ override def buildPartitionSchema(): StructType = fileIndex.partitionSchema
+
+ override def buildDataSchema(): StructType = fileIndex.dataSchema
+}
+
+abstract class
BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+
override val metaClient: HoodieTableMetaClient,
+
override val options: Map[String, String],
+
override val schemaSpec: Option[StructType],
+
isBootstrap: Boolean)
+ extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options,
schemaSpec, isBootstrap) {
+
+ override protected def getMandatoryFields(): Seq[String] =
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
+ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) ++ partitionColumnsToRead
+
+ override protected def isMOR: Boolean = false
+
+ override protected def isIncremental: Boolean = true
+
}
class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
override val
metaClient: HoodieTableMetaClient,
override val
options: Map[String, String],
override val
schemaSpec: Option[StructType],
isBootstrap: Boolean)
- extends HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ extends BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
- override val mandatoryFields: Seq[String] =
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
- preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) ++ partitionColumnsToRead
-
- override val fileIndex = new HoodieIncrementalFileIndex(
- sparkSession, metaClient, schemaSpec, options,
FileStatusCache.getOrCreate(sparkSession), false, true)
+ private val incrementalFileIndex = new HoodieIncrementalFileIndex(
Review Comment:
It doesn't matter because we always use it right away
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]