This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 155a6a3 map and struct type write format (#146) 155a6a3 is described below commit 155a6a3f94f8b843c85431c3462a69a9452e3f46 Author: gnehil <adamlee...@gmail.com> AuthorDate: Sun Oct 8 18:30:37 2023 +0800 map and struct type write format (#146) --- .../org/apache/doris/spark/sql/SchemaUtils.scala | 33 +++++++++++++++++++--- .../apache/doris/spark/sql/SchemaUtilsTest.scala | 21 +++++++++----- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 86a403f..677cc2e 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -176,17 +176,42 @@ private[spark] object SchemaUtils { val mapData = row.getMap(ordinal) val keys = mapData.keyArray() val values = mapData.valueArray() + val sb = StringBuilder.newBuilder + sb.append("{") var i = 0 - val map = mutable.Map[Any, Any]() while (i < keys.numElements()) { - map += rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, i, mt.valueType) + rowColumnValue(keys, i, mt.keyType) -> rowColumnValue(values, i, mt.valueType) + sb.append(quoteData(rowColumnValue(keys, i, mt.keyType), mt.keyType)) + .append(":").append(quoteData(rowColumnValue(values, i, mt.valueType), mt.valueType)) + .append(",") i += 1 } - map.toMap.asJava - case st: StructType => row.getStruct(ordinal, st.length) + if (i > 0) sb.dropRight(1) + sb.append("}").toString + case st: StructType => + val structData = row.getStruct(ordinal, st.length) + val sb = StringBuilder.newBuilder + sb.append("{") + var i = 0 + while (i < structData.numFields) { + val field = st.get(i) + sb.append(s""""${field.name}":""") + .append(quoteData(rowColumnValue(structData, i, field.dataType), field.dataType)) + .append(",") + i += 1 + } + if (i > 0) sb.dropRight(1) + sb.append("}").toString case _ => throw new DorisException(s"Unsupported spark type: ${dataType.typeName}") } } + private def quoteData(value: Any, dataType: DataType): Any = { + dataType match { + case StringType | TimestampType | DateType => s""""$value"""" + case _ => value + } + } + } diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala index e3868cb..7e6e5f5 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SchemaUtilsTest.scala @@ -17,11 +17,11 @@ package org.apache.doris.spark.sql -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SparkSession} import org.junit.{Assert, Ignore, Test} import java.sql.{Date, Timestamp} -import scala.collection.JavaConverters._ @Ignore class SchemaUtilsTest { @@ -31,9 +31,16 @@ class SchemaUtilsTest { val spark = SparkSession.builder().master("local").getOrCreate() - val df = spark.createDataFrame(Seq( - (1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 17:00:00"), Array(1, 2, 3), Map[String, String]("a" -> "1")) - )).toDF("c1", "c2", "c3", "c4", "c5") + val rdd = spark.sparkContext.parallelize(Seq( + Row(1, Date.valueOf("2023-09-08"), Timestamp.valueOf("2023-09-08 17:00:00"), Array(1, 2, 3), + Map[String, String]("a" -> "1"), Row("a", 1)) + )) + val df = spark.createDataFrame(rdd, new StructType().add("c1", IntegerType) + .add("c2", DateType) + .add("c3", TimestampType) + .add("c4", ArrayType.apply(IntegerType)) + .add("c5", MapType.apply(StringType, StringType)) + .add("c6", StructType.apply(Seq(StructField("a", StringType), StructField("b", IntegerType))))) val schema = df.schema @@ -44,8 +51,8 @@ class SchemaUtilsTest { Assert.assertEquals("2023-09-08", SchemaUtils.rowColumnValue(row, 1, fields(1).dataType)) Assert.assertEquals("2023-09-08 17:00:00.0", SchemaUtils.rowColumnValue(row, 2, fields(2).dataType)) Assert.assertEquals("[1,2,3]", SchemaUtils.rowColumnValue(row, 3, fields(3).dataType)) - println(SchemaUtils.rowColumnValue(row, 4, fields(4).dataType)) - Assert.assertEquals(Map("a" -> "1").asJava, SchemaUtils.rowColumnValue(row, 4, fields(4).dataType)) + Assert.assertEquals("{\"a\":\"1\"}", SchemaUtils.rowColumnValue(row, 4, fields(4).dataType)) + Assert.assertEquals("{\"a\":\"a\",\"b\":1}", SchemaUtils.rowColumnValue(row, 5, fields(5).dataType)) }) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org