This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 61f8307 array type support. (#75) 61f8307 is described below commit 61f83074d229a786f8d90e94cd525d4cae384591 Author: benjobs <benj...@apache.org> AuthorDate: Tue Mar 14 09:26:42 2023 +0800 array type support. (#75) * scala version bug fixed,array type convert bug fuxed --------- Co-authored-by: benjobs <benj...@gmail.com> --- spark-doris-connector/pom.xml | 1 - .../apache/doris/spark/serialization/RowBatch.java | 16 +++++++- .../org/apache/doris/spark/sql/SchemaUtils.scala | 43 +++++++++++----------- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index ecfa032..e89813d 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -64,7 +64,6 @@ <properties> <spark.version>3.1.2</spark.version> <spark.minor.version>3.1</spark.minor.version> - <scala.version>2.12.8</scala.version> <scala.version>2.12</scala.version> <libthrift.version>0.13.0</libthrift.version> <arrow.version>5.0.0</arrow.version> diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index 72c7ac9..e666de5 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -38,6 +38,7 @@ import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.doris.spark.exception.DorisException; @@ -87,7 +88,7 @@ public class RowBatch { this.arrowStreamReader = new ArrowStreamReader( new ByteArrayInputStream(nextResult.getRows()), rootAllocator - ); + ); try { VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); while (arrowStreamReader.loadNextBatch()) { @@ -275,6 +276,19 @@ public class RowBatch { addValueToRow(rowIndex, value); } break; + case "ARRAY": + Preconditions.checkArgument(mt.equals(Types.MinorType.LIST), + typeMismatchMessage(currentType, mt)); + ListVector listVector = (ListVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + if (listVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + continue; + } + String value = listVector.getObject(rowIndex).toString(); + addValueToRow(rowIndex, value); + } + break; default: String errMsg = "Unsupported type " + schema.get(col).getType(); logger.error(errMsg); diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 15ad7a1..2565aff 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -17,8 +17,7 @@ package org.apache.doris.spark.sql -import scala.collection.JavaConverters._ - +import scala.collection.JavaConversions._ import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.exception.DorisException import org.apache.doris.spark.rest.RestService @@ -26,16 +25,14 @@ import org.apache.doris.spark.rest.models.{Field, Schema} import org.apache.doris.thrift.TScanColumnDesc import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD import org.apache.spark.sql.types._ - import org.slf4j.LoggerFactory -import scala.collection.mutable - private[spark] object SchemaUtils { private val logger = LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$")) /** * discover Doris table schema from Doris FE. + * * @param cfg configuration * @return Spark Catalyst StructType */ @@ -46,6 +43,7 @@ private[spark] object SchemaUtils { /** * discover Doris table schema from Doris FE. + * * @param cfg configuration * @return inner schema struct */ @@ -55,35 +53,34 @@ private[spark] object SchemaUtils { /** * convert inner schema struct to Spark Catalyst StructType + * * @param schema inner schema * @return Spark Catalyst StructType */ def convertToStruct(dorisReadFields: String, schema: Schema): StructType = { - var fieldList = new Array[String](schema.size()) - val fieldSet = new mutable.HashSet[String]() - var fields = List[StructField]() - if (dorisReadFields != null && dorisReadFields.length > 0) { - fieldList = dorisReadFields.split(",") - for (field <- fieldList) { - fieldSet.add(field) - } - schema.getProperties.asScala.foreach(f => - if (fieldSet.contains(f.getName)) { - fields :+= DataTypes.createStructField(f.getName, getCatalystType(f.getType, f.getPrecision, f.getScale), true) - }) + val fieldList = if (dorisReadFields != null && dorisReadFields.length > 0) { + dorisReadFields.split(",") } else { - schema.getProperties.asScala.foreach(f => - fields :+= DataTypes.createStructField(f.getName, getCatalystType(f.getType, f.getPrecision, f.getScale), true) - ) + Array.empty[String] } - DataTypes.createStructType(fields.asJava) + val fields = schema.getProperties + .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty) + .map(f => + DataTypes.createStructField( + f.getName, + getCatalystType(f.getType, f.getPrecision, f.getScale), + true + ) + ) + DataTypes.createStructType(fields) } /** * translate Doris Type to Spark Catalyst type + * * @param dorisType Doris type * @param precision decimal precision - * @param scale decimal scale + * @param scale decimal scale * @return Spark Catalyst type */ def getCatalystType(dorisType: String, precision: Int, scale: Int): DataType = { @@ -112,6 +109,7 @@ private[spark] object SchemaUtils { case "DECIMAL128I" => DecimalType(precision, scale) case "TIME" => DataTypes.DoubleType case "STRING" => DataTypes.StringType + case "ARRAY" => DataTypes.StringType case "HLL" => throw new DorisException("Unsupported type " + dorisType) case _ => @@ -121,6 +119,7 @@ private[spark] object SchemaUtils { /** * convert Doris return schema to inner schema struct. + * * @param tscanColumnDescs Doris BE return schema * @return inner schema struct */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org