This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 155a6a3  map and struct type write format (#146)
155a6a3 is described below

commit 155a6a3f94f8b843c85431c3462a69a9452e3f46
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Sun Oct 8 18:30:37 2023 +0800

    map and struct type write format (#146)
---
 .../org/apache/doris/spark/sql/SchemaUtils.scala   | 33 +++++++++++++++++++---
 .../apache/doris/spark/sql/SchemaUtilsTest.scala   | 21 +++++++++-----
 2 files changed, 43 insertions(+), 11 deletions(-)

diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 86a403f..677cc2e 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -176,17 +176,42 @@ private[spark] object SchemaUtils {
         val mapData = row.getMap(ordinal)
         val keys = mapData.keyArray()
         val values = mapData.valueArray()
+        val sb = StringBuilder.newBuilder
+        sb.append("{")
         var i = 0
-        val map = mutable.Map[Any, Any]()
         while (i < keys.numElements()) {
-          map += rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, 
i, mt.valueType)
+          rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, i, 
mt.valueType)
+          sb.append(quoteData(rowColumnValue(keys, i, mt.keyType), mt.keyType))
+            .append(":").append(quoteData(rowColumnValue(values, i, 
mt.valueType), mt.valueType))
+            .append(",")
           i += 1
         }
-        map.toMap.asJava
-      case st: StructType => row.getStruct(ordinal, st.length)
+        if (i > 0) sb.dropRight(1)
+        sb.append("}").toString
+      case st: StructType =>
+        val structData = row.getStruct(ordinal, st.length)
+        val sb = StringBuilder.newBuilder
+        sb.append("{")
+        var i = 0
+        while (i < structData.numFields) {
+          val field = st.get(i)
+          sb.append(s""""${field.name}":""")
+            .append(quoteData(rowColumnValue(structData, i, field.dataType), 
field.dataType))
+            .append(",")
+          i += 1
+        }
+        if (i > 0) sb.dropRight(1)
+        sb.append("}").toString
       case _ => throw new DorisException(s"Unsupported spark type: 
${dataType.typeName}")
     }
 
   }
 
+  private def quoteData(value: Any, dataType: DataType): Any = {
+    dataType match {
+      case StringType | TimestampType | DateType => s""""$value""""
+      case _ => value
+    }
+  }
+
 }
diff --git 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
index e3868cb..7e6e5f5 100644
--- 
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
+++ 
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala
@@ -17,11 +17,11 @@
 
 package org.apache.doris.spark.sql
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SparkSession}
 import org.junit.{Assert, Ignore, Test}
 
 import java.sql.{Date, Timestamp}
-import scala.collection.JavaConverters._
 
 @Ignore
 class SchemaUtilsTest {
@@ -31,9 +31,16 @@ class SchemaUtilsTest {
 
     val spark = SparkSession.builder().master("local").getOrCreate()
 
-    val df = spark.createDataFrame(Seq(
-      (1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 
17:00:00"), Array(1, 2, 3), Map[String, String]("a" -> "1"))
-    )).toDF("c1", "c2", "c3", "c4", "c5")
+    val rdd = spark.sparkContext.parallelize(Seq(
+      Row(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 
17:00:00"), Array(1, 2, 3),
+        Map[String, String]("a" -> "1"), Row("a", 1))
+    ))
+    val df = spark.createDataFrame(rdd, new StructType().add("c1", IntegerType)
+      .add("c2", DateType)
+      .add("c3", TimestampType)
+      .add("c4", ArrayType.apply(IntegerType))
+      .add("c5", MapType.apply(StringType, StringType))
+      .add("c6", StructType.apply(Seq(StructField("a", StringType), 
StructField("b", IntegerType)))))
 
     val schema = df.schema
 
@@ -44,8 +51,8 @@ class SchemaUtilsTest {
       Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1, 
fields(1).dataType))
       Assert.assertEquals("2023-09-08 17:00:00.0", 
SchemaUtils.rowColumnValue(row, 2, fields(2).dataType))
       Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3, 
fields(3).dataType))
-      println(SchemaUtils.rowColumnValue(row, 4, fields(4).dataType))
-      Assert.assertEquals(Map("a" -> "1").asJava, 
SchemaUtils.rowColumnValue(row, 4, fields(4).dataType))
+      Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4, 
fields(4).dataType))
+      Assert.assertEquals("{\"a\":\"a\",\"b\":1}", 
SchemaUtils.rowColumnValue(row, 5, fields(5).dataType))
 
     })
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to