This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 38c2718  [fix] The result of serialization of decimal type does not 
meet the expected problem (#155)
38c2718 is described below

commit 38c2718f44af9d13e41b59622b5ccac8a03f413e
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Tue Nov 7 10:54:56 2023 +0800

    [fix] The result of serialization of decimal type does not meet the 
expected problem (#155)
---
 .../src/main/java/org/apache/doris/spark/util/DataUtil.java    | 10 +++++-----
 .../main/scala/org/apache/doris/spark/sql/SchemaUtils.scala    |  2 +-
 .../main/scala/org/apache/doris/spark/writer/DorisWriter.scala |  4 ++++
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
index 3f53d45..530657e 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/DataUtil.java
@@ -17,10 +17,11 @@
 
 package org.apache.doris.spark.util;
 
-import org.apache.doris.spark.sql.SchemaUtils;
-
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.module.scala.DefaultScalaModule;
+import org.apache.doris.spark.sql.SchemaUtils;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -31,7 +32,7 @@ import java.util.Map;
 
 public class DataUtil {
 
-    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final ObjectMapper MAPPER = 
JsonMapper.builder().addModule(new DefaultScalaModule()).build();
 
     public static final String NULL_VALUE = "\\N";
 
@@ -67,8 +68,7 @@ public class DataUtil {
         return builder.toString().getBytes(StandardCharsets.UTF_8);
     }
 
-    public static byte[] rowToJsonBytes(InternalRow row, StructType schema)
-            throws JsonProcessingException {
+    public static byte[] rowToJsonBytes(InternalRow row, StructType schema) 
throws JsonProcessingException {
         StructField[] fields = schema.fields();
         Map<String, Object> rowMap = new HashMap<>(row.numFields());
         for (int i = 0; i < fields.length; i++) {
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 982e580..1f0e942 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -168,7 +168,7 @@ private[spark] object SchemaUtils {
           new Timestamp(row.getLong(ordinal) / 1000).toString
         case DateType => DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString
         case BinaryType => row.getBinary(ordinal)
-        case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale)
+        case dt: DecimalType => row.getDecimal(ordinal, dt.precision, 
dt.scale).toJavaBigDecimal
         case at: ArrayType =>
           val arrayData = row.getArray(ordinal)
           if (arrayData == null) DataUtil.NULL_VALUE
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
index f90bcc6..55f4d73 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala
@@ -79,6 +79,10 @@ class DorisWriter(settings: SparkSettings) extends 
Serializable {
    * @param dataFrame source dataframe
    */
   def writeStream(dataFrame: DataFrame): Unit = {
+    if (enable2PC) {
+      val errMsg = "two phrase commit is not supported in stream mode, please 
set doris.sink.enable-2pc to false."
+      throw new UnsupportedOperationException(errMsg)
+    }
     doWrite(dataFrame, dorisStreamLoader.loadStream)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to