This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 18453a456db0f5be0b935e5a04d8c0cdb3958936
Author: wenningd <[email protected]>
AuthorDate: Mon Mar 30 15:52:15 2020 -0700

    [HUDI-713] Fix conversion of Spark array of struct type to Avro schema 
(#1406)
    
    Co-authored-by: Wenning Ding <[email protected]>
---
 .../hudi/common/HoodieTestDataGenerator.java       | 32 ++++++++++++++++++----
 .../org/apache/hudi/AvroConversionHelper.scala     |  8 +++---
 .../org/apache/hudi/AvroConversionUtils.scala      | 10 -------
 .../resources/delta-streamer-config/source.avsc    | 27 ++++++++++++++++++
 .../sql-transformer.properties                     |  2 +-
 .../resources/delta-streamer-config/target.avsc    | 26 ++++++++++++++++++
 6 files changed, 84 insertions(+), 21 deletions(-)

diff --git 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index e0d2a53..99bea63 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -46,6 +47,7 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -74,17 +76,25 @@ public class HoodieTestDataGenerator {
   public static final String[] DEFAULT_PARTITION_PATHS =
       {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH};
   public static final int DEFAULT_PARTITION_DEPTH = 3;
-  public static final String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
+  public static final String TRIP_SCHEMA_PREFIX = "{\"type\": \"record\"," + 
"\"name\": \"triprec\"," + "\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": 
\"_row_key\", \"type\": \"string\"},"
       + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": 
\"driver\", \"type\": \"string\"},"
       + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": 
\"begin_lon\", \"type\": \"double\"},"
-      + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},"
-      + "{\"name\": \"fare\",\"type\": {\"type\":\"record\", 
\"name\":\"fare\",\"fields\": ["
-      + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", 
\"type\": \"string\"}]}},"
-      + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", 
\"default\": false} ]}";
+      + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": 
\"end_lon\", \"type\": \"double\"},";
+  public static final String TRIP_SCHEMA_SUFFIX = "{\"name\": 
\"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
+  public static final String FARE_NESTED_SCHEMA = "{\"name\": 
\"fare\",\"type\": {\"type\":\"record\", \"name\":\"fare\",\"fields\": ["
+      + "{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", 
\"type\": \"string\"}]}},";
+  public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", 
\"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": 
\"tip_history\", \"fields\": ["
+      + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": 
\"currency\", \"type\": \"string\"}]}}},";
+  public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", 
\"type\": {\"type\": \"map\", \"values\": \"string\"}},";
+
+  public static final String TRIP_EXAMPLE_SCHEMA =
+      TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + 
TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
+
   public static final String NULL_SCHEMA = 
Schema.create(Schema.Type.NULL).toString();
   public static final String TRIP_HIVE_COLUMN_TYPES = 
"double,string,string,string,double,double,double,double,"
-                                                  + 
"struct<amount:double,currency:string>,boolean";
+      + 
"map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
+
   public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
   public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
       HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
@@ -156,11 +166,21 @@ public class HoodieTestDataGenerator {
     rec.put("end_lat", RAND.nextDouble());
     rec.put("end_lon", RAND.nextDouble());
 
+    rec.put("city_to_state", Collections.singletonMap("LA", "CA"));
+
     GenericRecord fareRecord = new 
GenericData.Record(AVRO_SCHEMA.getField("fare").schema());
     fareRecord.put("amount", RAND.nextDouble() * 100);
     fareRecord.put("currency", "USD");
     rec.put("fare", fareRecord);
 
+    GenericArray<GenericRecord> tipHistoryArray = new GenericData.Array<>(1, 
AVRO_SCHEMA.getField("tip_history").schema());
+    Schema tipSchema = new 
Schema.Parser().parse(AVRO_SCHEMA.getField("tip_history").schema().toString()).getElementType();
+    GenericRecord tipRecord = new GenericData.Record(tipSchema);
+    tipRecord.put("amount", RAND.nextDouble() * 100);
+    tipRecord.put("currency", "USD");
+    tipHistoryArray.add(tipRecord);
+    rec.put("tip_history", tipHistoryArray);
+
     if (isDeleteRecord) {
       rec.put("_hoodie_is_deleted", true);
     } else {
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index b61bef3..43225bc 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -27,7 +27,6 @@ import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Schema.Type._
 import org.apache.avro.generic.GenericData.{Fixed, Record}
 import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
-import org.apache.hudi.AvroConversionUtils.getNewRecordNamespace
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.avro.{IncompatibleSchemaException, 
SchemaConverters}
 import org.apache.spark.sql.catalyst.expressions.GenericRow
@@ -303,7 +302,7 @@ object AvroConversionHelper {
           avroSchema,
           elementType,
           structName,
-          getNewRecordNamespace(elementType, recordNamespace, structName))
+          recordNamespace)
         (item: Any) => {
           if (item == null) {
             null
@@ -324,7 +323,7 @@ object AvroConversionHelper {
           avroSchema,
           valueType,
           structName,
-          getNewRecordNamespace(valueType, recordNamespace, structName))
+          recordNamespace)
         (item: Any) => {
           if (item == null) {
             null
@@ -338,12 +337,13 @@ object AvroConversionHelper {
         }
       case structType: StructType =>
         val schema: Schema = SchemaConverters.toAvroType(structType, nullable 
= false, structName, recordNamespace)
+        val childNameSpace = if (recordNamespace != "") 
s"$recordNamespace.$structName" else structName
         val fieldConverters = structType.fields.map(field =>
           createConverterToAvro(
             avroSchema,
             field.dataType,
             field.name,
-            getNewRecordNamespace(field.dataType, recordNamespace, 
structName)))
+            childNameSpace))
         (item: Any) => {
           if (item == null) {
             null
diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index decab9c..04de1c7 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -71,16 +71,6 @@ object AvroConversionUtils {
     }
   }
 
-  def getNewRecordNamespace(elementDataType: DataType,
-                            currentRecordNamespace: String,
-                            elementName: String): String = {
-
-    elementDataType match {
-      case StructType(_) => s"$currentRecordNamespace.$elementName"
-      case _ => currentRecordNamespace
-    }
-  }
-
   def convertStructTypeToAvroSchema(structType: StructType,
                                     structName: String,
                                     recordNamespace: String): Schema = {
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc 
b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
index a02d48d..8d01820 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
@@ -45,6 +45,13 @@
     "type" : "double"
   },
   {
+    "name" :"city_to_state",
+    "type" : {
+      "type" : "map",
+      "values": "string"
+    }
+  },
+  {
     "name" : "fare",
     "type" : {
       "type" : "record",
@@ -62,6 +69,26 @@
     }
   },
   {
+    "name" : "tip_history",
+    "type" : {
+      "type" : "array",
+      "items" : {
+        "type" : "record",
+        "name" : "tip_history",
+        "fields" : [
+          {
+            "name" : "amount",
+            "type" : "double"
+          },
+          {
+            "name" : "currency",
+            "type" : "string"
+          }
+        ]
+      }
+    }
+  },
+  {
     "name" : "_hoodie_is_deleted",
     "type" : "boolean",
     "default" : false
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
 
b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
index e8b2857..569b417 100644
--- 
a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
@@ -16,4 +16,4 @@
 # limitations under the License.
 ###
 include=base.properties
-hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, 
a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, 
a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
+hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, 
a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.city_to_state, 
a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS 
haversine_distance FROM <SRC> a
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc 
b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
index b64fe4d..4fbb5c5 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
@@ -44,6 +44,12 @@
     "name" : "end_lon",
     "type" : "double"
   }, {
+    "name" :"city_to_state",
+    "type" : {
+      "type" : "map",
+      "values": "string"
+    }
+  }, {
     "name" : "fare",
     "type" : {
       "type" : "record",
@@ -61,6 +67,26 @@
     }
   },
   {
+    "name" : "tip_history",
+    "type" : {
+      "type" : "array",
+      "items" : {
+        "type" : "record",
+        "name" : "tip_history",
+        "fields" : [
+          {
+            "name" : "amount",
+            "type" : "double"
+          },
+          {
+            "name" : "currency",
+            "type" : "string"
+          }
+        ]
+      }
+    }
+  },
+  {
      "name" : "_hoodie_is_deleted",
      "type" : "boolean",
      "default" : false

Reply via email to