This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 18453a456db0f5be0b935e5a04d8c0cdb3958936 Author: wenningd <[email protected]> AuthorDate: Mon Mar 30 15:52:15 2020 -0700 [HUDI-713] Fix conversion of Spark array of struct type to Avro schema (#1406) Co-authored-by: Wenning Ding <[email protected]> --- .../hudi/common/HoodieTestDataGenerator.java | 32 ++++++++++++++++++---- .../org/apache/hudi/AvroConversionHelper.scala | 8 +++--- .../org/apache/hudi/AvroConversionUtils.scala | 10 ------- .../resources/delta-streamer-config/source.avsc | 27 ++++++++++++++++++ .../sql-transformer.properties | 2 +- .../resources/delta-streamer-config/target.avsc | 26 ++++++++++++++++++ 6 files changed, 84 insertions(+), 21 deletions(-) diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index e0d2a53..99bea63 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; @@ -46,6 +47,7 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -74,17 +76,25 @@ public class HoodieTestDataGenerator { public static final String[] DEFAULT_PARTITION_PATHS = {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}; public static final int DEFAULT_PARTITION_DEPTH = 3; - public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"}," + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"}," + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," - + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," - + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [" - + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}}," - + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"; + + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"; + public static final String TRIP_SCHEMA_SUFFIX = "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}"; + public static final String FARE_NESTED_SCHEMA = "{\"name\": \"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": [" + + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}},"; + public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": \"tip_history\", \"fields\": [" + + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": \"currency\", \"type\": \"string\"}]}}},"; + public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", \"type\": {\"type\": \"map\", \"values\": \"string\"}},"; + + public static final String TRIP_EXAMPLE_SCHEMA = + TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; + public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double," - + "struct<amount:double,currency:string>,boolean"; + + "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean"; + public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS = HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA); @@ -156,11 +166,21 @@ public class HoodieTestDataGenerator { rec.put("end_lat", RAND.nextDouble()); rec.put("end_lon", RAND.nextDouble()); + rec.put("city_to_state", Collections.singletonMap("LA", "CA")); + GenericRecord fareRecord = new GenericData.Record(AVRO_SCHEMA.getField("fare").schema()); fareRecord.put("amount", RAND.nextDouble() * 100); fareRecord.put("currency", "USD"); rec.put("fare", fareRecord); + GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, AVRO_SCHEMA.getField("tip_history").schema()); + Schema tipSchema = new Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType(); + GenericRecord tipRecord = new GenericData.Record(tipSchema); + tipRecord.put("amount", RAND.nextDouble() * 100); + tipRecord.put("currency", "USD"); + tipHistoryArray.add(tipRecord); + rec.put("tip_history", tipHistoryArray); + if (isDeleteRecord) { rec.put("_hoodie_is_deleted", true); } else { diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index b61bef3..43225bc 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -27,7 +27,6 @@ import org.apache.avro.{LogicalTypes, Schema} import org.apache.avro.Schema.Type._ import org.apache.avro.generic.GenericData.{Fixed, Record} import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord} -import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace import org.apache.spark.sql.Row import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.GenericRow @@ -303,7 +302,7 @@ object AvroConversionHelper { avroSchema, elementType, structName, - getNewRecordNamespace(elementType, recordNamespace, structName)) + recordNamespace) (item: Any) => { if (item == null) { null @@ -324,7 +323,7 @@ object AvroConversionHelper { avroSchema, valueType, structName, - getNewRecordNamespace(valueType, recordNamespace, structName)) + recordNamespace) (item: Any) => { if (item == null) { null @@ -338,12 +337,13 @@ object AvroConversionHelper { } case structType: StructType => val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) + val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName val fieldConverters = structType.fields.map(field => createConverterToAvro( avroSchema, field.dataType, field.name, - getNewRecordNamespace(field.dataType, recordNamespace, structName))) + childNameSpace)) (item: Any) => { if (item == null) { null diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index decab9c..04de1c7 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -71,16 +71,6 @@ object AvroConversionUtils { } } - def getNewRecordNamespace(elementDataType: DataType, - currentRecordNamespace: String, - elementName: String): String = { - - elementDataType match { - case StructType(_) => s"$currentRecordNamespace.$elementName" - case _ => currentRecordNamespace - } - } - def convertStructTypeToAvroSchema(structType: StructType, structName: String, recordNamespace: String): Schema = { diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc index a02d48d..8d01820 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc @@ -45,6 +45,13 @@ "type" : "double" }, { + "name" :"city_to_state", + "type" : { + "type" : "map", + "values": "string" + } + }, + { "name" : "fare", "type" : { "type" : "record", @@ -62,6 +69,26 @@ } }, { + "name" : "tip_history", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "tip_history", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + } + }, + { "name" : "_hoodie_is_deleted", "type" : "boolean", "default" : false diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties index e8b2857..569b417 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties @@ -16,4 +16,4 @@ # limitations under the License. ### include=base.properties -hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a +hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc index b64fe4d..4fbb5c5 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc @@ -44,6 +44,12 @@ "name" : "end_lon", "type" : "double" }, { + "name" :"city_to_state", + "type" : { + "type" : "map", + "values": "string" + } + }, { "name" : "fare", "type" : { "type" : "record", @@ -61,6 +67,26 @@ } }, { + "name" : "tip_history", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "tip_history", + "fields" : [ + { + "name" : "amount", + "type" : "double" + }, + { + "name" : "currency", + "type" : "string" + } + ] + } + } + }, + { "name" : "_hoodie_is_deleted", "type" : "boolean", "default" : false
