This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 38c2718 [fix] The result of serialization of decimal type does not meet the expected problem (#155) 38c2718 is described below commit 38c2718f44af9d13e41b59622b5ccac8a03f413e Author: gnehil <adamlee...@gmail.com> AuthorDate: Tue Nov 7 10:54:56 2023 +0800 [fix] The result of serialization of decimal type does not meet the expected problem (#155) --- .../src/main/java/org/apache/doris/spark/util/DataUtil.java | 10 +++++----- .../main/scala/org/apache/doris/spark/sql/SchemaUtils.scala | 2 +- .../main/scala/org/apache/doris/spark/writer/DorisWriter.scala | 4 ++++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java index 3f53d45..530657e 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java @@ -17,10 +17,11 @@ package org.apache.doris.spark.util; -import org.apache.doris.spark.sql.SchemaUtils; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.module.scala.DefaultScalaModule; +import org.apache.doris.spark.sql.SchemaUtils; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -31,7 +32,7 @@ import java.util.Map; public class DataUtil { - private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final ObjectMapper MAPPER = JsonMapper.builder().addModule(new DefaultScalaModule()).build(); public static final String NULL_VALUE = "\\N"; @@ -67,8 +68,7 @@ public class DataUtil { return builder.toString().getBytes(StandardCharsets.UTF_8); } - public static byte[] rowToJsonBytes(InternalRow row, StructType schema) - throws JsonProcessingException { + public static byte[] rowToJsonBytes(InternalRow row, StructType schema) throws JsonProcessingException { StructField[] fields = schema.fields(); Map<String, Object> rowMap = new HashMap<>(row.numFields()); for (int i = 0; i < fields.length; i++) { diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 982e580..1f0e942 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -168,7 +168,7 @@ private[spark] object SchemaUtils { new Timestamp(row.getLong(ordinal) / 1000).toString case DateType => DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString case BinaryType => row.getBinary(ordinal) - case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale) + case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal case at: ArrayType => val arrayData = row.getArray(ordinal) if (arrayData == null) DataUtil.NULL_VALUE diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index f90bcc6..55f4d73 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -79,6 +79,10 @@ class DorisWriter(settings: SparkSettings) extends Serializable { * @param dataFrame source dataframe */ def writeStream(dataFrame: DataFrame): Unit = { + if (enable2PC) { + val errMsg = "two phrase commit is not supported in stream mode, please set doris.sink.enable-2pc to false." + throw new UnsupportedOperationException(errMsg) + } doWrite(dataFrame, dorisStreamLoader.loadStream) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org