This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release-0.11.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d979feee3483debe7c8aa0f3d25ebed18e41bcfe Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Mon Apr 18 23:04:04 2022 +0530 [HUDI-3707] Fix target schema handling in HoodieSparkUtils while creating RDD (#5347) --- .../main/scala/org/apache/hudi/HoodieSparkUtils.scala | 19 +++++++++++++------ .../utilities/functional/TestHoodieDeltaStreamer.java | 5 ----- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 57eb32fce3..54bc06bd76 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -26,6 +26,9 @@ import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} @@ -36,12 +39,8 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructField, StructType} -import java.util.Properties - -import org.apache.hudi.internal.schema.InternalSchema -import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter -import org.apache.hudi.internal.schema.utils.InternalSchemaUtils +import java.util.Properties import scala.collection.JavaConverters._ object HoodieSparkUtils extends SparkAdapterSupport { @@ -130,7 +129,15 @@ object HoodieSparkUtils extends SparkAdapterSupport { */ def createRdd(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean, latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = { - val latestTableSchemaConverted = if (latestTableSchema.isPresent && reconcileToLatestSchema) Some(latestTableSchema.get()) else None + var latestTableSchemaConverted : Option[Schema] = None + + if (latestTableSchema.isPresent && reconcileToLatestSchema) { + latestTableSchemaConverted = Some(latestTableSchema.get()) + } else { + // cases when users want to use latestTableSchema but have not turned on reconcileToLatestSchema explicitly + // for example, when using a Transformer implementation to transform source RDD to target RDD + latestTableSchemaConverted = if (latestTableSchema.isPresent) Some(latestTableSchema.get()) else None + } createRdd(df, structName, recordNamespace, latestTableSchemaConverted) } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 804676f0ff..0576f6aaee 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1690,7 +1690,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testParquetDFSSource(false, null, true); } - @Disabled("HUDI-3707 To investigate problem with schema provider and transformer") @Test public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception { testParquetDFSSource(false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); @@ -1701,7 +1700,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testParquetDFSSource(true, null); } - @Disabled("HUDI-3707 To investigate problem with schema provider and transformer") @Test public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception { testParquetDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); @@ -1712,7 +1710,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testORCDFSSource(false, null); } - @Disabled("HUDI-3707 To investigate problem with schema provider and transformer") @Test public void testORCDFSSourceWithSchemaProviderAndWithTransformer() throws Exception { testORCDFSSource(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); @@ -1807,7 +1804,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testCsvDFSSource(true, '\t', false, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); } - @Disabled("HUDI-3707 To investigate problem with schema provider and transformer") @Test public void testCsvDFSSourceWithHeaderAndSepWithSchemaProviderAndTransformer() throws Exception { // The CSV files have header, the columns are separated by '\t' @@ -1850,7 +1846,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertTrue(e.getMessage().contains("cannot resolve '`begin_lat`' given input columns:")); } - @Disabled("HUDI-3707 To investigate problem with schema provider and transformer") @Test public void testCsvDFSSourceNoHeaderWithSchemaProviderAndTransformer() throws Exception { // The CSV files do not have header, the columns are separated by '\t'