nsivabalan commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r811230085
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -125,48 +127,65 @@ public HoodieMetadataPayload(GenericRecord record,
Comparable<?> orderingVal) {
this(Option.of(record));
}
- public HoodieMetadataPayload(Option<GenericRecord> record) {
- if (record.isPresent()) {
+ public HoodieMetadataPayload(Option<GenericRecord> recordOpt) {
+ if (recordOpt.isPresent()) {
+ GenericRecord record = recordOpt.get();
// This can be simplified using SpecificData.deepcopy once this bug is
fixed
// https://issues.apache.org/jira/browse/AVRO-1811
- key = record.get().get(KEY_FIELD_NAME).toString();
- type = (int) record.get().get(SCHEMA_FIELD_NAME_TYPE);
- if (record.get().get(SCHEMA_FIELD_NAME_METADATA) != null) {
- filesystemMetadata = (Map<String, HoodieMetadataFileInfo>)
record.get().get("filesystemMetadata");
+ //
+ // NOTE: {@code HoodieMetadataRecord} has to always carry both "key" nad
"type" fields
Review comment:
minor typo. "nad" -> "and"
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -133,46 +129,49 @@ object HoodieSparkUtils extends SparkAdapterSupport {
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty,
fileStatusCache)
}
- def createRdd(df: DataFrame, structName: String, recordNamespace: String,
reconcileToLatestSchema: Boolean, latestTableSchema:
- org.apache.hudi.common.util.Option[Schema] =
org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
- val dfWriteSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
recordNamespace)
- var writeSchema : Schema = null;
- var toReconcileSchema : Schema = null;
- if (reconcileToLatestSchema && latestTableSchema.isPresent) {
- // if reconcileToLatestSchema is set to true and latestSchema is
present, then try to leverage latestTableSchema.
- // this code path will handle situations where records are serialized in
odl schema, but callers wish to convert
- // to Rdd[GenericRecord] using different schema(could be evolved schema
or could be latest table schema)
- writeSchema = dfWriteSchema
- toReconcileSchema = latestTableSchema.get()
- } else {
- // there are paths where callers wish to use latestTableSchema to
convert to Rdd[GenericRecords] and not use
- // row's schema. So use latestTableSchema if present. if not available,
fallback to using row's schema.
- writeSchema = if (latestTableSchema.isPresent) {
latestTableSchema.get()} else { dfWriteSchema}
- }
- createRddInternal(df, writeSchema, toReconcileSchema, structName,
recordNamespace)
+ /**
+ * @deprecated please use other overload [[createRdd]]
+ */
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
reconcileToLatestSchema: Boolean,
+ latestTableSchema: org.apache.hudi.common.util.Option[Schema]
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
+ val latestTableSchemaConverted = if (latestTableSchema.isPresent &&
reconcileToLatestSchema) Some(latestTableSchema.get()) else None
+ createRdd(df, structName, recordNamespace, latestTableSchemaConverted)
}
- def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema:
Schema, structName: String, recordNamespace: String)
- : RDD[GenericRecord] = {
- // Use the write avro schema to derive the StructType which has the
correct nullability information
- val writeDataType =
AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
- val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
- val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
- // if records were serialized with old schema, but an evolved schema was
passed in with latestTableSchema, we need
- // latestTableSchema equivalent datatype to be passed in to
AvroConversionHelper.createConverterToAvro()
- val reconciledDataType =
- if (latestTableSchema != null)
AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema) else
writeDataType
- // Note: deserializer.deserializeRow(row) is not capable of handling
evolved schema. i.e. if Row was serialized in
- // old schema, but deserializer was created with an encoder with evolved
schema, deserialization fails.
- // Hence we always need to deserialize in the same schema as serialized
schema.
- df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
- .mapPartitions { records =>
- if (records.isEmpty) Iterator.empty
- else {
- val convertor =
AvroConversionHelper.createConverterToAvro(reconciledDataType, structName,
recordNamespace)
- records.map { x => convertor(x).asInstanceOf[GenericRecord] }
- }
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
readerAvroSchemaOpt: Option[Schema]): RDD[GenericRecord] = {
+ val writerSchema = df.schema
+ val writerAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName,
recordNamespace)
+ val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+ // We check whether passed in reader schema is identical to writer schema
to avoid costly serde loop of
+ // making Spark deserialize its internal representation [[InternalRow]]
into [[Row]] for subsequent conversion
+ // (and back)
+ val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+ val (nullable, _) =
AvroConversionUtils.resolveAvroTypeNullability(writerAvroSchema)
+
+ // NOTE: We have to serialize Avro schema, and then subsequently parse it
on the executor node, since Spark
+ // serializer is not able to digest it
+ val readerAvroSchemaStr = readerAvroSchema.toString
+ val writerAvroSchemaStr = writerAvroSchema.toString
+ // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to
[[Row]] conversion
+ df.queryExecution.toRdd.mapPartitions { rows =>
+ if (rows.isEmpty) {
+ Iterator.empty
+ } else {
+ val transform: GenericRecord => GenericRecord =
+ if (sameSchema) identity
+ else {
+ val readerAvroSchema = new
Schema.Parser().parse(readerAvroSchemaStr)
+ rewriteRecord(_, readerAvroSchema)
Review comment:
do we know any perf implications due to this. Prior to this patch, we
were using AvroConversionHelper.createConverterToAvro to convert from one
schema to another. But here we are rewriting the GenericRecord again. I don't
have much idea here on the implications. but wanted to call it out.
CC @xushiyan if you have any thoughts here.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala
##########
@@ -71,23 +74,25 @@ trait ProvidesHoodieConfig extends Logging {
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
SqlKeyGenerator.PARTITION_SCHEMA ->
hoodieCatalogTable.partitionSchema.toDDL
)
+ .filter { case(_, v) => v != null }
Review comment:
sorry, what is this filter for ?
##########
File path:
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/PatchedAvroDeserializer.scala
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic._
+import org.apache.avro.util.Utf8
+import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData,
DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.math.BigDecimal
+import java.nio.ByteBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * A deserializer to deserialize data in avro format to data in catalyst
format.
+ *
+ * NOTE: This is a version of {@code AvroDeserializer} impl from Spark 2.4.4
w/ the fix for SPARK-30267
Review comment:
May I know how did you deduce the list of patches to be applied.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
##########
@@ -75,84 +77,89 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
private val fileIndex = if (commitsToReturn.isEmpty) List() else
buildFileIndex()
- private val preCombineField = {
- val preCombineFieldFromTableConfig =
metaClient.getTableConfig.getPreCombineField
- if (preCombineFieldFromTableConfig != null) {
- Some(preCombineFieldFromTableConfig)
- } else {
- // get preCombineFiled from the options if this is a old table which
have not store
- // the field to hoodie.properties
- optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
- }
- }
+ private val preCombineFieldOpt = getPrecombineFieldProperty
- override def needConversion: Boolean = false
+ // Record filters making sure that only records w/in the requested bounds
are being fetched as part of the
+ // scan collected by this relation
+ private lazy val incrementalSpanRecordsFilters: Seq[Filter] = {
+ val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+ val largerThanFilter =
GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp)
+ val lessThanFilter =
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.last.getTimestamp)
+ Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
+ }
- override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
Review comment:
why removed overriding unhandledFilters. I do see we set pushFilters to
same set of filters. But wondering if diff set of filters could mean or get
applied differrently.
excerpt from DataSourceStrategy.
```
val (unhandledPredicates, pushedFilters, handledFilters) =
selectFilters(relation.relation, candidatePredicates)
```
Do you happened to know whats difference between setting some filter in
unhandledPredicates vs pushedFilters here.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -96,45 +92,56 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
+ // NOTE: In case list of requested columns doesn't contain the Primary Key
one, we
+ // have to add it explicitly so that
+ // - Merging could be performed correctly
+ // - In case 0 columns are to be fetched (for ex, when doing
{@code count()} on Spark's [[Dataset]],
+ // Spark still fetches all the rows to execute the query correctly
+ //
+ // It's okay to return columns that have not been requested by the
caller, as those nevertheless will be
+ // filtered out upstream
+ val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
val (requiredAvroSchema, requiredStructSchema) =
- HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
+ HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val fileIndex = buildFileIndex(filters)
- val hoodieTableState = HoodieMergeOnReadTableState(
- tableStructSchema,
- requiredStructSchema,
- tableAvroSchema.toString,
- requiredAvroSchema.toString,
- fileIndex,
- preCombineField,
- recordKeyFieldOpt
- )
- val fullSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = sqlContext.sparkSession,
- dataSchema = tableStructSchema,
- partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
+
+ val partitionSchema = StructType(Nil)
+ val tableSchema = HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString)
+ val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString)
+
+ val fullSchemaParquetReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ tableSchema = tableSchema,
+ requiredSchema = tableSchema,
+ // This file-reader is used to read base file records, subsequently
merging them with the records
+ // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
+ // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
+ // we combine them correctly)
filters = Seq.empty,
options = optParams,
- hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = new Configuration(conf)
Review comment:
so prior to this patch, it was co-incidental that our col projection
wasn't kicking in and hence we got away with it is it? if not, this would have
broken things right.
(wrt comment in L 118 to 121)
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -41,8 +107,8 @@ object AvroConversionUtils {
else {
val schema = new Schema.Parser().parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema)
- val convertor = AvroConversionHelper.createConverterToRow(schema,
dataType)
- records.map { x => convertor(x).asInstanceOf[Row] }
+ val converter = createConverterToRow(schema, dataType)
Review comment:
sounds good.
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -133,46 +130,49 @@ object HoodieSparkUtils extends SparkAdapterSupport {
new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty,
fileStatusCache)
}
- def createRdd(df: DataFrame, structName: String, recordNamespace: String,
reconcileToLatestSchema: Boolean, latestTableSchema:
- org.apache.hudi.common.util.Option[Schema] =
org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
- val dfWriteSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
recordNamespace)
- var writeSchema : Schema = null;
- var toReconcileSchema : Schema = null;
- if (reconcileToLatestSchema && latestTableSchema.isPresent) {
- // if reconcileToLatestSchema is set to true and latestSchema is
present, then try to leverage latestTableSchema.
- // this code path will handle situations where records are serialized in
odl schema, but callers wish to convert
- // to Rdd[GenericRecord] using different schema(could be evolved schema
or could be latest table schema)
- writeSchema = dfWriteSchema
- toReconcileSchema = latestTableSchema.get()
- } else {
- // there are paths where callers wish to use latestTableSchema to
convert to Rdd[GenericRecords] and not use
- // row's schema. So use latestTableSchema if present. if not available,
fallback to using row's schema.
- writeSchema = if (latestTableSchema.isPresent) {
latestTableSchema.get()} else { dfWriteSchema}
- }
- createRddInternal(df, writeSchema, toReconcileSchema, structName,
recordNamespace)
+ /**
+ * @deprecated please use other overload [[createRdd]]
+ */
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
reconcileToLatestSchema: Boolean,
+ latestTableSchema: org.apache.hudi.common.util.Option[Schema]
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
+ val latestTableSchemaConverted = if (latestTableSchema.isPresent &&
reconcileToLatestSchema) Some(latestTableSchema.get()) else None
+ createRdd(df, structName, recordNamespace, latestTableSchemaConverted)
}
- def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema:
Schema, structName: String, recordNamespace: String)
- : RDD[GenericRecord] = {
- // Use the write avro schema to derive the StructType which has the
correct nullability information
- val writeDataType =
AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
- val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
- val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
- // if records were serialized with old schema, but an evolved schema was
passed in with latestTableSchema, we need
- // latestTableSchema equivalent datatype to be passed in to
AvroConversionHelper.createConverterToAvro()
- val reconciledDataType =
- if (latestTableSchema != null)
AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema) else
writeDataType
- // Note: deserializer.deserializeRow(row) is not capable of handling
evolved schema. i.e. if Row was serialized in
- // old schema, but deserializer was created with an encoder with evolved
schema, deserialization fails.
- // Hence we always need to deserialize in the same schema as serialized
schema.
- df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
- .mapPartitions { records =>
- if (records.isEmpty) Iterator.empty
- else {
- val convertor =
AvroConversionHelper.createConverterToAvro(reconciledDataType, structName,
recordNamespace)
- records.map { x => convertor(x).asInstanceOf[GenericRecord] }
- }
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
readerAvroSchemaOpt: Option[Schema]): RDD[GenericRecord] = {
+ val writerSchema = df.schema
+ val writerAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName,
recordNamespace)
+ val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+ // We check whether passed in reader schema is identical to writer schema
to avoid costly serde loop of
+ // making Spark deserialize its internal representation [[InternalRow]]
into [[Row]] for subsequent conversion
+ // (and back)
+ val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+ val (nullable, _) = resolveAvroTypeNullability(writerAvroSchema)
+
+ // NOTE: We have to serialize Avro schema, and then subsequently parse it
on the executor node, since Spark
+ // serializer is not able to digest it
+ val readerAvroSchemaStr = readerAvroSchema.toString
+ val writerAvroSchemaStr = writerAvroSchema.toString
+ // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to
[[Row]] conversion
+ df.queryExecution.toRdd.mapPartitions { rows =>
+ if (rows.isEmpty) {
+ Iterator.empty
+ } else {
+ val transform: GenericRecord => GenericRecord =
+ if (sameSchema) identity
+ else {
+ val readerAvroSchema = new
Schema.Parser().parse(readerAvroSchemaStr)
+ rewriteRecord(_, readerAvroSchema)
+ }
+
+ // Since caller might request to get records in a different
("evolved") schema, we will be rewriting from
+ // existing Writer's schema into Reader's (avro) schema
+ val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
+ val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerAvroSchema, nullable = nullable)
Review comment:
got you!
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -56,7 +57,7 @@ case class InsertIntoHoodieTableCommand(
}
}
-object InsertIntoHoodieTableCommand extends Logging {
+object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
Review comment:
@YannByron : can you review the sql dml class changes in this patch.
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
##########
@@ -125,41 +114,39 @@ class MergeOnReadIncrementalRelation(sqlContext:
SQLContext,
val (requiredAvroSchema, requiredStructSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
- val hoodieTableState = HoodieMergeOnReadTableState(
- tableStructSchema,
- requiredStructSchema,
- tableAvroSchema.toString,
- requiredAvroSchema.toString,
- fileIndex,
- preCombineField,
- Option.empty
- )
- val fullSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = sqlContext.sparkSession,
- dataSchema = tableStructSchema,
- partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
+ val partitionSchema = StructType(Nil)
+ val tableSchema = HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString)
+ val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString)
+
+ val fullSchemaParquetReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ tableSchema = tableSchema,
+ requiredSchema = tableSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
-
- val requiredSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = sqlContext.sparkSession,
- dataSchema = tableStructSchema,
- partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
+ val requiredSchemaParquetReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ tableSchema = tableSchema,
+ requiredSchema = requiredSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
)
+ val hoodieTableState = HoodieMergeOnReadTableState(fileIndex,
HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt)
Review comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]