This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 61f8307  array type support. (#75)
61f8307 is described below

commit 61f83074d229a786f8d90e94cd525d4cae384591
Author: benjobs <benj...@apache.org>
AuthorDate: Tue Mar 14 09:26:42 2023 +0800

    array type support. (#75)
    
    * scala version bug fixed,array type convert bug fuxed
    ---------
    Co-authored-by: benjobs <benj...@gmail.com>
---
 spark-doris-connector/pom.xml                      |  1 -
 .../apache/doris/spark/serialization/RowBatch.java | 16 +++++++-
 .../org/apache/doris/spark/sql/SchemaUtils.scala   | 43 +++++++++++-----------
 3 files changed, 36 insertions(+), 24 deletions(-)

diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index ecfa032..e89813d 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -64,7 +64,6 @@
     <properties>
         <spark.version>3.1.2</spark.version>
         <spark.minor.version>3.1</spark.minor.version>
-        <scala.version>2.12.8</scala.version>
         <scala.version>2.12</scala.version>
         <libthrift.version>0.13.0</libthrift.version>
         <arrow.version>5.0.0</arrow.version>
diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index 72c7ac9..e666de5 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -38,6 +38,7 @@ import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.Types;
 import org.apache.doris.spark.exception.DorisException;
@@ -87,7 +88,7 @@ public class RowBatch {
         this.arrowStreamReader = new ArrowStreamReader(
                 new ByteArrayInputStream(nextResult.getRows()),
                 rootAllocator
-                );
+        );
         try {
             VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
             while (arrowStreamReader.loadNextBatch()) {
@@ -275,6 +276,19 @@ public class RowBatch {
                             addValueToRow(rowIndex, value);
                         }
                         break;
+                    case "ARRAY":
+                        
Preconditions.checkArgument(mt.equals(Types.MinorType.LIST),
+                                typeMismatchMessage(currentType, mt));
+                        ListVector listVector = (ListVector) curFieldVector;
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
+                            if (listVector.isNull(rowIndex)) {
+                                addValueToRow(rowIndex, null);
+                                continue;
+                            }
+                            String value = 
listVector.getObject(rowIndex).toString();
+                            addValueToRow(rowIndex, value);
+                        }
+                        break;
                     default:
                         String errMsg = "Unsupported type " + 
schema.get(col).getType();
                         logger.error(errMsg);
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 15ad7a1..2565aff 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -17,8 +17,7 @@
 
 package org.apache.doris.spark.sql
 
-import scala.collection.JavaConverters._
-
+import scala.collection.JavaConversions._
 import org.apache.doris.spark.cfg.Settings
 import org.apache.doris.spark.exception.DorisException
 import org.apache.doris.spark.rest.RestService
@@ -26,16 +25,14 @@ import org.apache.doris.spark.rest.models.{Field, Schema}
 import org.apache.doris.thrift.TScanColumnDesc
 import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
 import org.apache.spark.sql.types._
-
 import org.slf4j.LoggerFactory
 
-import scala.collection.mutable
-
 private[spark] object SchemaUtils {
   private val logger = 
LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$"))
 
   /**
    * discover Doris table schema from Doris FE.
+   *
    * @param cfg configuration
    * @return Spark Catalyst StructType
    */
@@ -46,6 +43,7 @@ private[spark] object SchemaUtils {
 
   /**
    * discover Doris table schema from Doris FE.
+   *
    * @param cfg configuration
    * @return inner schema struct
    */
@@ -55,35 +53,34 @@ private[spark] object SchemaUtils {
 
   /**
    * convert inner schema struct to Spark Catalyst StructType
+   *
    * @param schema inner schema
    * @return Spark Catalyst StructType
    */
   def convertToStruct(dorisReadFields: String, schema: Schema): StructType = {
-    var fieldList = new Array[String](schema.size())
-    val fieldSet = new mutable.HashSet[String]()
-    var fields = List[StructField]()
-    if (dorisReadFields != null && dorisReadFields.length > 0) {
-      fieldList = dorisReadFields.split(",")
-      for (field <- fieldList) {
-        fieldSet.add(field)
-      }
-      schema.getProperties.asScala.foreach(f =>
-        if (fieldSet.contains(f.getName)) {
-          fields :+= DataTypes.createStructField(f.getName, 
getCatalystType(f.getType, f.getPrecision, f.getScale), true)
-        })
+    val fieldList = if (dorisReadFields != null && dorisReadFields.length > 0) 
{
+      dorisReadFields.split(",")
     } else {
-      schema.getProperties.asScala.foreach(f =>
-        fields :+= DataTypes.createStructField(f.getName, 
getCatalystType(f.getType, f.getPrecision, f.getScale), true)
-      )
+      Array.empty[String]
     }
-    DataTypes.createStructType(fields.asJava)
+    val fields = schema.getProperties
+      .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty)
+      .map(f =>
+        DataTypes.createStructField(
+          f.getName,
+          getCatalystType(f.getType, f.getPrecision, f.getScale),
+          true
+        )
+      )
+    DataTypes.createStructType(fields)
   }
 
   /**
    * translate Doris Type to Spark Catalyst type
+   *
    * @param dorisType Doris type
    * @param precision decimal precision
-   * @param scale decimal scale
+   * @param scale     decimal scale
    * @return Spark Catalyst type
    */
   def getCatalystType(dorisType: String, precision: Int, scale: Int): DataType 
= {
@@ -112,6 +109,7 @@ private[spark] object SchemaUtils {
       case "DECIMAL128I"     => DecimalType(precision, scale)
       case "TIME"            => DataTypes.DoubleType
       case "STRING"          => DataTypes.StringType
+      case "ARRAY"           => DataTypes.StringType
       case "HLL"             =>
         throw new DorisException("Unsupported type " + dorisType)
       case _                 =>
@@ -121,6 +119,7 @@ private[spark] object SchemaUtils {
 
   /**
    * convert Doris return schema to inner schema struct.
+   *
    * @param tscanColumnDescs Doris BE return schema
    * @return inner schema struct
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to