This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch rc3-patched-for-test in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2818846f62f72185fdb6c2fcae5104ccec132294 Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Wed Apr 20 15:20:40 2022 -0700 Fixed instantiation of the components t/h reflection --- .../apache/hudi/common/util/ReflectionUtils.java | 4 +-- .../parquet/Spark312HoodieParquetFileFormat.scala | 8 +++--- .../parquet/Spark32HoodieParquetFileFormat.scala | 33 ++++++++++------------ 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index ec361d9f9a..13228c440c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -123,7 +123,7 @@ public class ReflectionUtils { * @param ctorArgs specific constructor arguments * @return new instance of the class */ - public static <T> T newInstanceUnchecked(Class<T> klass, Object ...ctorArgs) { + public static <T> T newInstanceUnchecked(Class<T> klass, Object... ctorArgs) { Class<?>[] ctorArgTypes = Arrays.stream(ctorArgs).map(Object::getClass).toArray(Class<?>[]::new); return newInstanceUnchecked(klass, ctorArgTypes, ctorArgs); } @@ -136,7 +136,7 @@ public class ReflectionUtils { * @param ctorArgs specific constructor arguments * @return new instance of the class */ - public static <T> T newInstanceUnchecked(Class<T> klass, Class<?>[] ctorArgTypes, Object ...ctorArgs) { + public static <T> T newInstanceUnchecked(Class<T> klass, Class<?>[] ctorArgTypes, Object... ctorArgs) { try { return unsafeCast(klass.getConstructor(ctorArgTypes).newInstance(ctorArgs)); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala index 6061edd522..4c9902a3c4 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark312HoodieParquetFileFormat.scala @@ -331,8 +331,6 @@ class Spark312HoodieParquetFileFormat(private val shouldAppendPartitionValues: B object Spark312HoodieParquetFileFormat { - val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters" - def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) if (querySchemaOption.isPresent && requiredSchema.nonEmpty) { @@ -344,8 +342,10 @@ object Spark312HoodieParquetFileFormat { } private def createParquetFilters(args: Any*): ParquetFilters = { - val instance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) - instance.asInstanceOf[ParquetFilters] + // ParquetFilters bears a single ctor (in Spark 3.1) + val ctor = classOf[ParquetFilters].getConstructors.head + ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) + .asInstanceOf[ParquetFilters] } private def rebuildFilterFromParquet(oldFilter: Filter, fileSchema: InternalSchema, querySchema: InternalSchema): Filter = { diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala index 99cb83cf51..351203ca58 100644 --- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32HoodieParquetFileFormat.scala @@ -36,6 +36,7 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} import org.apache.spark.TaskContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow} @@ -43,11 +44,9 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.parquet.Spark32HoodieParquetFileFormat._ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{AtomicType, DataType, StructField, StructType} -import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession} -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.SerializableConfiguration import java.net.URI @@ -396,27 +395,25 @@ class Spark32HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo object Spark32HoodieParquetFileFormat { - private val PARQUET_FILTERS_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetFilters" - private val PARQUET_VECTORIZED_READER_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader" - private val PARQUET_READ_SUPPORT_CLASS_NAME = "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport" - private def createParquetFilters(args: Any*): ParquetFilters = { - val parquetFiltersInstance = ReflectionUtils.loadClass(PARQUET_FILTERS_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) - parquetFiltersInstance.asInstanceOf[ParquetFilters] - } - - private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = { - val vectorizedRecordReader = - ReflectionUtils.loadClass(PARQUET_VECTORIZED_READER_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) - vectorizedRecordReader.asInstanceOf[VectorizedParquetRecordReader] + // NOTE: ParquetFilters ctor args contain Scala enum, therefore we can't look it + // up by arg types, and have to instead rely on relative order of ctors + val ctor = classOf[ParquetFilters].getConstructors.head + ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) + .asInstanceOf[ParquetFilters] } private def createParquetReadSupport(args: Any*): ParquetReadSupport = { - val parquetReadSupport = - ReflectionUtils.loadClass(PARQUET_READ_SUPPORT_CLASS_NAME, args.map(_.asInstanceOf[AnyRef]): _*) - parquetReadSupport.asInstanceOf[ParquetReadSupport] + // NOTE: ParquetReadSupport ctor args contain Scala enum, therefore we can't look it + // up by arg types, and have to instead rely on relative order of ctors + val ctor = classOf[ParquetReadSupport].getConstructors.head + ctor.newInstance(args.map(_.asInstanceOf[AnyRef]): _*) + .asInstanceOf[ParquetReadSupport] } + private def createVectorizedParquetRecordReader(args: Any*): VectorizedParquetRecordReader = + ReflectionUtils.newInstanceUnchecked(classOf[VectorizedParquetRecordReader], args.map(_.asInstanceOf[AnyRef]): _*) + def pruneInternalSchema(internalSchemaStr: String, requiredSchema: StructType): String = { val querySchemaOption = SerDeHelper.fromJson(internalSchemaStr) if (querySchemaOption.isPresent && requiredSchema.nonEmpty) {