nsivabalan commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r808583562
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -227,17 +223,20 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
baseFileIterator:
Iterator[InternalRow],
config: Configuration):
Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
- private val tableAvroSchema = new
Schema.Parser().parse(tableState.tableAvroSchema)
- private val requiredAvroSchema = new
Schema.Parser().parse(tableState.requiredAvroSchema)
- private val serializer =
HoodieAvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
- private val requiredDeserializer =
HoodieAvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
+ private val tableAvroSchema = new
Schema.Parser().parse(tableSchema.avroSchemaStr)
+ private val requiredAvroSchema = new
Schema.Parser().parse(requiredSchema.avroSchemaStr)
+ private val requiredFieldPosition =
+ requiredSchema.structTypeSchema
+ .map(f => tableAvroSchema.getField(f.name).pos()).toList
+ private val requiredSerializer =
HoodieAvroSerializer(requiredSchema.structTypeSchema, requiredAvroSchema,
nullable = false)
+ private val requiredDeserializer =
HoodieAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
- private val unsafeProjection =
UnsafeProjection.create(tableState.requiredStructSchema)
+ private val unsafeProjection =
UnsafeProjection.create(requiredSchema.structTypeSchema)
private var logScanner = HoodieMergeOnReadRDD.scanLog(split,
tableAvroSchema, config)
private val logRecords = logScanner.getRecords
private val logRecordsKeyIterator =
logRecords.keySet().iterator().asScala
private val keyToSkip = mutable.Set.empty[String]
- private val recordKeyPosition = if (recordKeyFieldOpt.isEmpty)
HOODIE_RECORD_KEY_COL_POS else
tableState.tableStructSchema.fieldIndex(recordKeyFieldOpt.get)
+ private val recordKeyPosition =
tableSchema.structTypeSchema.fieldIndex(recordKeyField)
Review comment:
recordKeyField will either point to hoodie meta field or one of user's
fields (incase of virtual cols) is it? and hence safe to directly fetch the
fieldIndex ?
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -41,8 +107,8 @@ object AvroConversionUtils {
else {
val schema = new Schema.Parser().parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema)
- val convertor = AvroConversionHelper.createConverterToRow(schema,
dataType)
- records.map { x => convertor(x).asInstanceOf[Row] }
+ val converter = createConverterToRow(schema, dataType)
Review comment:
why using a deprecated api? can't we use newer apis? if its still in
use, why deprecate it ?
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -133,46 +130,49 @@ object HoodieSparkUtils extends SparkAdapterSupport {
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty,
fileStatusCache)
}
- def createRdd(df: DataFrame, structName: String, recordNamespace: String,
reconcileToLatestSchema: Boolean, latestTableSchema:
- org.apache.hudi.common.util.Option[Schema] =
org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
- val dfWriteSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
recordNamespace)
- var writeSchema : Schema = null;
- var toReconcileSchema : Schema = null;
- if (reconcileToLatestSchema && latestTableSchema.isPresent) {
- // if reconcileToLatestSchema is set to true and latestSchema is
present, then try to leverage latestTableSchema.
- // this code path will handle situations where records are serialized in
odl schema, but callers wish to convert
- // to Rdd[GenericRecord] using different schema(could be evolved schema
or could be latest table schema)
- writeSchema = dfWriteSchema
- toReconcileSchema = latestTableSchema.get()
- } else {
- // there are paths where callers wish to use latestTableSchema to
convert to Rdd[GenericRecords] and not use
- // row's schema. So use latestTableSchema if present. if not available,
fallback to using row's schema.
- writeSchema = if (latestTableSchema.isPresent) {
latestTableSchema.get()} else { dfWriteSchema}
- }
- createRddInternal(df, writeSchema, toReconcileSchema, structName,
recordNamespace)
+ /**
+ * @deprecated please use other overload [[createRdd]]
+ */
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
reconcileToLatestSchema: Boolean,
+ latestTableSchema: org.apache.hudi.common.util.Option[Schema]
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
+ val latestTableSchemaConverted = if (latestTableSchema.isPresent &&
reconcileToLatestSchema) Some(latestTableSchema.get()) else None
+ createRdd(df, structName, recordNamespace, latestTableSchemaConverted)
}
- def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema:
Schema, structName: String, recordNamespace: String)
- : RDD[GenericRecord] = {
- // Use the write avro schema to derive the StructType which has the
correct nullability information
- val writeDataType =
AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
- val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
- val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
- // if records were serialized with old schema, but an evolved schema was
passed in with latestTableSchema, we need
- // latestTableSchema equivalent datatype to be passed in to
AvroConversionHelper.createConverterToAvro()
- val reconciledDataType =
- if (latestTableSchema != null)
AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema) else
writeDataType
- // Note: deserializer.deserializeRow(row) is not capable of handling
evolved schema. i.e. if Row was serialized in
- // old schema, but deserializer was created with an encoder with evolved
schema, deserialization fails.
- // Hence we always need to deserialize in the same schema as serialized
schema.
- df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
- .mapPartitions { records =>
- if (records.isEmpty) Iterator.empty
- else {
- val convertor =
AvroConversionHelper.createConverterToAvro(reconciledDataType, structName,
recordNamespace)
- records.map { x => convertor(x).asInstanceOf[GenericRecord] }
- }
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
readerAvroSchemaOpt: Option[Schema]): RDD[GenericRecord] = {
+ val writerSchema = df.schema
+ val writerAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName,
recordNamespace)
+ val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+ // We check whether passed in reader schema is identical to writer schema
to avoid costly serde loop of
+ // making Spark deserialize its internal representation [[InternalRow]]
into [[Row]] for subsequent conversion
+ // (and back)
+ val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+ val (nullable, _) = resolveAvroTypeNullability(writerAvroSchema)
+
+ // NOTE: We have to serialize Avro schema, and then subsequently parse it
on the executor node, since Spark
+ // serializer is not able to digest it
+ val readerAvroSchemaStr = readerAvroSchema.toString
+ val writerAvroSchemaStr = writerAvroSchema.toString
+ // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to
[[Row]] conversion
+ df.queryExecution.toRdd.mapPartitions { rows =>
+ if (rows.isEmpty) {
+ Iterator.empty
+ } else {
+ val transform: GenericRecord => GenericRecord =
+ if (sameSchema) identity
+ else {
+ val readerAvroSchema = new
Schema.Parser().parse(readerAvroSchemaStr)
+ rewriteRecord(_, readerAvroSchema)
+ }
+
+ // Since caller might request to get records in a different
("evolved") schema, we will be rewriting from
+ // existing Writer's schema into Reader's (avro) schema
+ val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
+ val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerAvroSchema, nullable = nullable)
Review comment:
can't we directly use readerAvroSchemaStr here to convert from
InternalRow to Avro. and we can avoid L 165 ?
or am I missing something.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -325,24 +319,48 @@ private object HoodieMergeOnReadRDD {
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config:
Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
- HoodieMergedLogRecordScanner.newBuilder()
- .withFileSystem(fs)
- .withBasePath(split.tablePath)
- .withLogFilePaths(split.logPaths.get.asJava)
- .withReaderSchema(logSchema)
- .withLatestInstantTime(split.latestCommit)
- .withReadBlocksLazily(
-
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
- .getOrElse(false))
- .withReverseReader(false)
- .withBufferSize(
- config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
- .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
- .withSpillableMapBasePath(
- config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
- HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
- .build()
+
+ if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+ val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).build()
+ val dataTableBasePath =
getDataTableBasePathFromMetadataTable(split.tablePath)
+ val metadataTable = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(config), metadataConfig,
+ dataTableBasePath,
+ config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+
+ // NOTE: In case of Metadata Table partition path equates to partition
name (since there's just one level
+ // of indirection among MT partitions)
+ val relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(split.tablePath), getPartitionPath(split))
+ metadataTable.getLogRecordScanner(split.logFiles.get.asJava,
relativePartitionPath).getLeft
+ } else {
+ HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(split.tablePath)
+ .withLogFilePaths(split.logFiles.get.map(logFile =>
getFilePath(logFile.getPath)).asJava)
+ .withReaderSchema(logSchema)
+ .withLatestInstantTime(split.latestCommit)
+ .withReadBlocksLazily(
+
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
+ .getOrElse(false))
+ .withReverseReader(false)
+ .withBufferSize(
+ config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+ HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
+ .withSpillableMapBasePath(
+ config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+ HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ .build()
+ }
+ }
+
+ private def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
Review comment:
getMetadataTablePartitionPath
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
##########
@@ -125,41 +114,39 @@ class MergeOnReadIncrementalRelation(sqlContext:
SQLContext,
val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
- val hoodieTableState = HoodieMergeOnReadTableState(
- tableStructSchema,
- requiredStructSchema,
- tableAvroSchema.toString,
- requiredAvroSchema.toString,
- fileIndex,
- preCombineField,
- Option.empty
- )
- val fullSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = sqlContext.sparkSession,
- dataSchema = tableStructSchema,
- partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
+ val partitionSchema = StructType(Nil)
+ val tableSchema = HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString)
+ val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString)
+
+ val fullSchemaParquetReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ tableSchema = tableSchema,
+ requiredSchema = tableSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
-
- val requiredSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = sqlContext.sparkSession,
- dataSchema = tableStructSchema,
- partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
+ val requiredSchemaParquetReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ tableSchema = tableSchema,
+ requiredSchema = requiredSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
+ val hoodieTableState = HoodieMergeOnReadTableState(fileIndex,
HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
Review comment:
2nd arg might differ for metadata table I guess
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]