yihua commented on code in PR #7528:
URL: https://github.com/apache/hudi/pull/7528#discussion_r1082866912
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -78,14 +80,52 @@ object HoodieCatalystExpressionUtils {
* NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is
only possible, if
* B is a subset of A
*/
- def generateUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
- val attrs = from.toAttributes
- val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
- val targetExprs = to.fields.map(f => attrsMap(f.name))
+ def generateUnsafeProjection(sourceStructType: StructType, targetStructType:
StructType): UnsafeProjection = {
+ val resolver = SQLConf.get.resolver
+ val attrs = sourceStructType.toAttributes
+ val targetExprs = targetStructType.fields.map { targetField =>
+ val attrRef = attrs.find(attr => resolver(attr.name, targetField.name))
+ .getOrElse(throw new AnalysisException(s"Wasn't able to match target
field `${targetField.name}` to any of the source attributes ($attrs)"))
+
+ genProjectingExpression(attrRef, targetField.dataType)
+ }
GenerateUnsafeProjection.generate(targetExprs, attrs)
}
+ private def genProjectingExpression(sourceExpr: Expression,
+ targetDataType: DataType): Expression = {
+ checkState(sourceExpr.resolved)
+
+ // TODO support array, map
+ (sourceExpr.dataType, targetDataType) match {
+ case (sdt, tdt) if sdt == tdt =>
+ sourceExpr
+
+ case (sourceType: StructType, targetType: StructType) =>
+ val fieldValueExprs = targetType.fields.map { tf =>
+ val ord = sourceType.fieldIndex(tf.name)
+ val fieldValExpr =
genProjectingExpression(GetStructField(sourceExpr, ord, Some(tf.name)),
tf.dataType)
+ Alias(fieldValExpr, tf.name)()
+ }
+
+ CreateStruct(fieldValueExprs)
+
+ case _ => throw new
UnsupportedOperationException(s"(${sourceExpr.dataType}, $targetDataType)")
+ }
+ }
+
+ // TODO scala-docs
Review Comment:
@alexeykudinkin could you address this?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala:
##########
@@ -427,6 +428,10 @@ class TestMORDataSource extends HoodieClientTestBase with
SparkDatasetMixin {
@ParameterizedTest
@EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO",
"SPARK"))
def testPrunedFiltered(recordType: HoodieRecordType) {
+
+ spark.sessionState.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false)
Review Comment:
Is this still needed?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala:
##########
@@ -106,18 +112,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
}
protected def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
- val partitions = listLatestBaseFiles(globPaths, partitionFilters,
dataFilters)
- val fileSplits = partitions.values.toSeq
- .flatMap { files =>
- files.flatMap { file =>
- // TODO fix, currently assuming parquet as underlying format
- HoodieDataSourceHelper.splitFiles(
- sparkSession = sparkSession,
- file = file,
- partitionValues = getPartitionColumnsAsInternalRow(file)
- )
- }
- }
+ val fileSlices = listLatestFileSlices(globPaths, partitionFilters,
dataFilters)
Review Comment:
Got it. Then it should be fine. Could you confirm we have test coverage
for MOR read-optimized query on a table with log files (which set of tests
covers this)?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -138,10 +137,16 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
override protected def getPartitions: Array[Partition] =
fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2,
file._1)).toArray
- private def getConfig: Configuration = {
- val conf = confBroadcast.value.value
- CONFIG_INSTANTIATION_LOCK.synchronized {
- new Configuration(conf)
- }
+ private def getHadoopConf: Configuration = {
+ val conf = hadoopConfBroadcast.value.value
+ new Configuration(conf)
Review Comment:
Given that this might introduce side effects and there's little time before
the code freeze of the release to verify the removal of the lock, could you
keep this part the same as before? It is not essential to the PR.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala:
##########
@@ -78,14 +80,52 @@ object HoodieCatalystExpressionUtils {
* NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is
only possible, if
* B is a subset of A
*/
- def generateUnsafeProjection(from: StructType, to: StructType):
UnsafeProjection = {
- val attrs = from.toAttributes
- val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
- val targetExprs = to.fields.map(f => attrsMap(f.name))
+ def generateUnsafeProjection(sourceStructType: StructType, targetStructType:
StructType): UnsafeProjection = {
+ val resolver = SQLConf.get.resolver
+ val attrs = sourceStructType.toAttributes
+ val targetExprs = targetStructType.fields.map { targetField =>
+ val attrRef = attrs.find(attr => resolver(attr.name, targetField.name))
+ .getOrElse(throw new AnalysisException(s"Wasn't able to match target
field `${targetField.name}` to any of the source attributes ($attrs)"))
+
+ genProjectingExpression(attrRef, targetField.dataType)
+ }
GenerateUnsafeProjection.generate(targetExprs, attrs)
}
+ private def genProjectingExpression(sourceExpr: Expression,
+ targetDataType: DataType): Expression = {
+ checkState(sourceExpr.resolved)
+
+ // TODO support array, map
+ (sourceExpr.dataType, targetDataType) match {
+ case (sdt, tdt) if sdt == tdt =>
+ sourceExpr
+
+ case (sourceType: StructType, targetType: StructType) =>
+ val fieldValueExprs = targetType.fields.map { tf =>
Review Comment:
Got it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]