This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ed1134  [fix](connector) fix read date type err (#265)
8ed1134 is described below

commit 8ed1134d0dacf655cb488f408494cfbb29376842
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Mon Feb 17 15:33:07 2025 +0800

    [fix](connector) fix read date type err (#265)
---
 .../spark/client/entity/DorisReaderPartition.java  |  19 +++-
 .../spark/client/read/AbstractThriftReader.java    |   7 +-
 .../spark/client/read/DorisFlightSqlReader.java    |   4 +-
 .../client/read/ReaderPartitionGenerator.java      |  14 +--
 .../apache/doris/spark/client/read/RowBatch.java   |  20 +++-
 .../apache/doris/spark/rdd/AbstractDorisRDD.scala  |   4 +-
 .../apache/doris/spark/util/RowConvertors.scala    |   5 +-
 .../doris/spark/client/read/RowBatchTest.java      | 104 ++++++++++++++++++---
 .../doris/spark/util/RowConvertorsTest.scala       |  22 +++--
 .../spark-doris-connector-spark-2/pom.xml          |   4 -
 .../doris/spark/read/AbstractDorisScan.scala       |  12 ++-
 .../doris/spark/read/DorisPartitionReader.scala    |  14 +--
 12 files changed, 176 insertions(+), 53 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java
index aa75319..608f019 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java
@@ -34,8 +34,10 @@ public class DorisReaderPartition implements Serializable {
     private final String[] filters;
     private final Integer limit;
     private final DorisConfig config;
+    private final Boolean datetimeJava8ApiEnabled;
 
-    public DorisReaderPartition(String database, String table, Backend 
backend, Long[] tablets, String opaquedQueryPlan, String[] readColumns, 
String[] filters, DorisConfig config) {
+    public DorisReaderPartition(String database, String table, Backend 
backend, Long[] tablets, String opaquedQueryPlan,
+                                String[] readColumns, String[] filters, 
DorisConfig config, Boolean datetimeJava8ApiEnabled) {
         this.database = database;
         this.table = table;
         this.backend = backend;
@@ -45,9 +47,11 @@ public class DorisReaderPartition implements Serializable {
         this.filters = filters;
         this.limit = -1;
         this.config = config;
+        this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
     }
 
-    public DorisReaderPartition(String database, String table, Backend 
backend, Long[] tablets, String opaquedQueryPlan, String[] readColumns, 
String[] filters, Integer limit, DorisConfig config) {
+    public DorisReaderPartition(String database, String table, Backend 
backend, Long[] tablets, String opaquedQueryPlan,
+                                String[] readColumns, String[] filters, 
Integer limit, DorisConfig config, Boolean datetimeJava8ApiEnabled) {
         this.database = database;
         this.table = table;
         this.backend = backend;
@@ -57,6 +61,7 @@ public class DorisReaderPartition implements Serializable {
         this.filters = filters;
         this.limit = limit;
         this.config = config;
+        this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
     }
 
     // Getters and Setters
@@ -96,6 +101,10 @@ public class DorisReaderPartition implements Serializable {
         return limit;
     }
 
+    public Boolean getDateTimeJava8APIEnabled() {
+        return datetimeJava8ApiEnabled;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null || getClass() != o.getClass()) return false;
@@ -108,11 +117,13 @@ public class DorisReaderPartition implements Serializable 
{
                 && Objects.deepEquals(readColumns, that.readColumns)
                 && Objects.deepEquals(filters, that.filters)
                 && Objects.equals(limit, that.limit)
-                && Objects.equals(config, that.config);
+                && Objects.equals(config, that.config)
+                && Objects.equals(datetimeJava8ApiEnabled, 
that.datetimeJava8ApiEnabled);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(database, table, backend, 
Arrays.hashCode(tablets), opaquedQueryPlan, Arrays.hashCode(readColumns), 
Arrays.hashCode(filters), limit, config);
+        return Objects.hash(database, table, backend, 
Arrays.hashCode(tablets), opaquedQueryPlan,
+                Arrays.hashCode(readColumns), Arrays.hashCode(filters), limit, 
config, datetimeJava8ApiEnabled);
     }
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
index c533de8..7fdb1cf 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
@@ -73,6 +73,8 @@ public abstract class AbstractThriftReader extends 
DorisReader {
 
     private int readCount = 0;
 
+    private final Boolean datetimeJava8ApiEnabled;
+
     protected AbstractThriftReader(DorisReaderPartition partition) throws 
Exception {
         super(partition);
         this.frontend = new DorisFrontendClient(config);
@@ -108,6 +110,7 @@ public abstract class AbstractThriftReader extends 
DorisReader {
             this.rowBatchQueue = null;
             this.asyncThread = null;
         }
+        this.datetimeJava8ApiEnabled = false;
     }
 
     private void runAsync() throws DorisException, InterruptedException {
@@ -124,7 +127,7 @@ public abstract class AbstractThriftReader extends 
DorisReader {
             });
             endOfStream.set(nextResult.isEos());
             if (!endOfStream.get()) {
-                rowBatch = new RowBatch(nextResult, dorisSchema);
+                rowBatch = new RowBatch(nextResult, dorisSchema, 
datetimeJava8ApiEnabled);
                 offset += rowBatch.getReadRowCount();
                 rowBatch.close();
                 rowBatchQueue.put(rowBatch);
@@ -178,7 +181,7 @@ public abstract class AbstractThriftReader extends 
DorisReader {
                 });
                 endOfStream.set(nextResult.isEos());
                 if (!endOfStream.get()) {
-                    rowBatch = new RowBatch(nextResult, dorisSchema);
+                    rowBatch = new RowBatch(nextResult, dorisSchema, 
datetimeJava8ApiEnabled);
                 }
             }
             hasNext = !endOfStream.get();
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index faa462b..4623d65 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -59,6 +59,7 @@ public class DorisFlightSqlReader extends DorisReader {
     private final Schema schema;
     private AdbcConnection connection;
     private final ArrowReader arrowReader;
+    private final Boolean datetimeJava8ApiEnabled;
 
     public DorisFlightSqlReader(DorisReaderPartition partition) throws 
Exception {
         super(partition);
@@ -74,6 +75,7 @@ public class DorisFlightSqlReader extends DorisReader {
         }
         this.schema = processDorisSchema(partition);
         this.arrowReader = executeQuery();
+        this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled();
     }
 
     @Override
@@ -85,7 +87,7 @@ public class DorisFlightSqlReader extends DorisReader {
                 throw new DorisException(e);
             }
             if (!endOfStream.get()) {
-                rowBatch = new RowBatch(arrowReader, schema);
+                rowBatch = new RowBatch(arrowReader, schema, 
datetimeJava8ApiEnabled);
             }
         }
         return !endOfStream.get();
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
index 4aa660a..002b58b 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
@@ -45,7 +45,7 @@ public class ReaderPartitionGenerator {
     /*
      * for spark 2
      */
-    public static DorisReaderPartition[] generatePartitions(DorisConfig 
config) throws Exception {
+    public static DorisReaderPartition[] generatePartitions(DorisConfig 
config, Boolean datetimeJava8ApiEnabled) throws Exception {
         String[] originReadCols;
         if (config.contains(DorisOptions.DORIS_READ_FIELDS) && 
!config.getValue(DorisOptions.DORIS_READ_FIELDS).equals("*")) {
             originReadCols = 
Arrays.stream(config.getValue(DorisOptions.DORIS_READ_FIELDS).split(","))
@@ -55,14 +55,15 @@ public class ReaderPartitionGenerator {
         }
         String[] filters = config.contains(DorisOptions.DORIS_FILTER_QUERY) ?
                 config.getValue(DorisOptions.DORIS_FILTER_QUERY).split("\\.") 
: new String[0];
-        return generatePartitions(config, originReadCols, filters, -1);
+        return generatePartitions(config, originReadCols, filters, -1, 
datetimeJava8ApiEnabled);
     }
 
     /*
      * for spark 3
      */
     public static DorisReaderPartition[] generatePartitions(DorisConfig config,
-                                                            String[] fields, 
String[] filters, Integer limit) throws Exception {
+                                                            String[] fields, 
String[] filters, Integer limit,
+                                                            Boolean 
datetimeJava8ApiEnabled) throws Exception {
         DorisFrontendClient frontend = new DorisFrontendClient(config);
         String fullTableName = 
config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER);
         String[] tableParts = fullTableName.split("\\.");
@@ -81,7 +82,7 @@ public class ReaderPartitionGenerator {
         Map<String, List<Long>> beToTablets = mappingBeToTablets(queryPlan);
         int maxTabletSize = config.getValue(DorisOptions.DORIS_TABLET_SIZE);
         return distributeTabletsToPartitions(db, table, beToTablets, 
queryPlan.getOpaqued_query_plan(), maxTabletSize,
-                finalReadColumns, filters, config, limit);
+                finalReadColumns, filters, config, limit, 
datetimeJava8ApiEnabled);
     }
 
     @VisibleForTesting
@@ -112,7 +113,8 @@ public class ReaderPartitionGenerator {
                                                                         
Map<String, List<Long>> beToTablets,
                                                                         String 
opaquedQueryPlan, int maxTabletSize,
                                                                         
String[] readColumns, String[] predicates,
-                                                                        
DorisConfig config, Integer limit) {
+                                                                        
DorisConfig config, Integer limit,
+                                                                        
Boolean datetimeJava8ApiEnabled) {
         List<DorisReaderPartition> partitions = new ArrayList<>();
         beToTablets.forEach((backendStr, tabletIds) -> {
             List<Long> distinctTablets = new ArrayList<>(new 
HashSet<>(tabletIds));
@@ -121,7 +123,7 @@ public class ReaderPartitionGenerator {
                 Long[] tablets = distinctTablets.subList(offset, 
Math.min(offset + maxTabletSize, distinctTablets.size())).toArray(new Long[0]);
                 offset += maxTabletSize;
                 partitions.add(new DorisReaderPartition(database, table, new 
Backend(backendStr), tablets,
-                        opaquedQueryPlan, readColumns, predicates, limit, 
config));
+                        opaquedQueryPlan, readColumns, predicates, limit, 
config, datetimeJava8ApiEnabled));
             }
         });
         return partitions.toArray(new DorisReaderPartition[0]);
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index 840825c..6b06b37 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -103,11 +103,14 @@ public class RowBatch implements Serializable {
     private int readRowCount = 0;
     private List<FieldVector> fieldVectors;
 
-    public RowBatch(TScanBatchResult nextResult, Schema schema) throws 
DorisException {
+    private final Boolean datetimeJava8ApiEnabled;
+
+    public RowBatch(TScanBatchResult nextResult, Schema schema, Boolean 
datetimeJava8ApiEnabled) throws DorisException {
 
         this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
         this.arrowReader = new ArrowStreamReader(new 
ByteArrayInputStream(nextResult.getRows()), rootAllocator);
         this.schema = schema;
+        this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
 
         try {
             VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
@@ -123,10 +126,11 @@ public class RowBatch implements Serializable {
 
     }
 
-    public RowBatch(ArrowReader reader, Schema schema) throws DorisException {
+    public RowBatch(ArrowReader reader, Schema schema, Boolean 
datetimeJava8ApiEnabled) throws DorisException {
 
         this.arrowReader = reader;
         this.schema = schema;
+        this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
 
         try {
             VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
@@ -368,7 +372,11 @@ public class RowBatch implements Serializable {
                                 }
                                 String stringValue = new 
String(date.get(rowIndex));
                                 LocalDate localDate = 
LocalDate.parse(stringValue);
-                                addValueToRow(rowIndex, 
Date.valueOf(localDate));
+                                if (datetimeJava8ApiEnabled) {
+                                    addValueToRow(rowIndex, localDate);
+                                } else {
+                                    addValueToRow(rowIndex, 
Date.valueOf(localDate));
+                                }
                             }
                         } else {
                             DateDayVector date = (DateDayVector) 
curFieldVector;
@@ -378,7 +386,11 @@ public class RowBatch implements Serializable {
                                     continue;
                                 }
                                 LocalDate localDate = 
LocalDate.ofEpochDay(date.get(rowIndex));
-                                addValueToRow(rowIndex, 
Date.valueOf(localDate));
+                                if (datetimeJava8ApiEnabled) {
+                                    addValueToRow(rowIndex, localDate);
+                                } else {
+                                    addValueToRow(rowIndex, 
Date.valueOf(localDate));
+                                }
                             }
                         }
                         break;
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
index bb9008a..50f6474 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
@@ -51,7 +51,9 @@ protected[spark] abstract class AbstractDorisRDD[T: ClassTag](
    */
   @transient private[spark] lazy val dorisCfg = 
DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava, false)
 
-  @transient private[spark] lazy val dorisPartitions = 
ReaderPartitionGenerator.generatePartitions(dorisCfg)
+  @transient private[spark] lazy val dateTimeJava8ApiEnabled = 
sc.getConf.get("spark.sql.datetime.java8API.enabled", "false").toBoolean
+
+  @transient private[spark] lazy val dorisPartitions = 
ReaderPartitionGenerator.generatePartitions(dorisCfg, dateTimeJava8ApiEnabled)
 }
 
 private[spark] class DorisPartition(rddId: Int, idx: Int, val dorisPartition: 
DorisReaderPartition)
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index aefb013..dd8a7d5 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 import java.sql.{Date, Timestamp}
+import java.time.LocalDate
 import scala.collection.mutable
 
 object RowConvertors {
@@ -105,10 +107,11 @@ object RowConvertors {
     }
   }
 
-  def convertValue(v: Any, dataType: DataType): Any = {
+  def convertValue(v: Any, dataType: DataType, datetimeJava8ApiEnabled: 
Boolean): Any = {
     dataType match {
       case StringType => UTF8String.fromString(v.asInstanceOf[String])
       case TimestampType => 
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(v.asInstanceOf[String]))
+      case DateType if datetimeJava8ApiEnabled => 
v.asInstanceOf[LocalDate].toEpochDay.toInt
       case DateType => DateTimeUtils.fromJavaDate(v.asInstanceOf[Date])
       case _: MapType =>
         val map = v.asInstanceOf[Map[String, String]]
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
index 134595f..7b0cca7 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
@@ -56,6 +56,8 @@ import org.apache.doris.sdk.thrift.TStatusCode;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.rest.RestService;
 import org.apache.doris.spark.rest.models.Schema;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.internal.SQLConf$;
 import org.apache.spark.sql.types.Decimal;
 import static org.hamcrest.core.StringStartsWith.startsWith;
 import org.junit.Assert;
@@ -73,6 +75,7 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.Arrays;
@@ -261,7 +264,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
 
         List<Object> expectedRow1 = Arrays.asList(
                 Boolean.TRUE,
@@ -375,7 +378,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
 
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
@@ -439,7 +442,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
 
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
@@ -527,7 +530,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
 
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
@@ -602,7 +605,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
 
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
@@ -683,7 +686,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
         Assert.assertTrue(rowBatch.hasNext());
         
Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k1", 
"0")).asScala(),
                 rowBatch.next().get(0));
@@ -749,7 +752,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
         Assert.assertTrue(rowBatch.hasNext());
         Assert.assertEquals("{\"a\":\"a1\",\"b\":1}", rowBatch.next().get(0));
 
@@ -827,7 +830,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
 
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
@@ -902,7 +905,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
 
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
@@ -995,7 +998,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
         assertEquals("0.0.0.0", actualRow0.get(0));
@@ -1104,7 +1107,7 @@ public class RowBatchTest {
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
 
-        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
         assertEquals("::", actualRow0.get(0));
@@ -1163,4 +1166,83 @@ public class RowBatchTest {
         rowBatch.next();
     }
 
+    @Test
+    public void testDatetimeJava8API() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(new Field("k0", FieldType.nullable(new 
ArrowType.Utf8()), null));
+        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Date(DateUnit.DAY)), null));
+
+        VectorSchemaRoot root = VectorSchemaRoot.create(
+                new 
org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+                new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
+                root,
+                new DictionaryProvider.MapDictionaryProvider(),
+                outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(1);
+
+        FieldVector vector = root.getVector("k0");
+        VarCharVector dateVector = (VarCharVector) vector;
+        dateVector.setInitialCapacity(1);
+        dateVector.allocateNew();
+        dateVector.setIndexDefined(0);
+        dateVector.setValueLengthSafe(0, 20);
+        dateVector.setSafe(0, "2025-01-01".getBytes());
+        vector.setValueCount(1);
+
+        LocalDate localDate = LocalDate.of(2025, 2, 1);
+        long date = localDate.toEpochDay();
+
+        vector = root.getVector("k1");
+        DateDayVector date2Vector = (DateDayVector) vector;
+        date2Vector.setInitialCapacity(1);
+        date2Vector.allocateNew();
+        date2Vector.setIndexDefined(0);
+        date2Vector.setSafe(0, (int) date);
+        vector.setValueCount(1);
+
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+
+        String schemaStr = "{\"properties\":[" +
+                "{\"type\":\"DATE\",\"name\":\"k0\",\"comment\":\"\"}, " +
+                "{\"type\":\"DATEV2\",\"name\":\"k1\",\"comment\":\"\"}" +
+                "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch1 = new RowBatch(scanBatchResult, schema, false);
+
+        Assert.assertTrue(rowBatch1.hasNext());
+        List<Object> actualRow0 = rowBatch1.next();
+        Assert.assertEquals(Date.valueOf("2025-01-01"), actualRow0.get(0));
+        Assert.assertEquals(Date.valueOf("2025-02-01"), actualRow0.get(1));
+
+        Assert.assertFalse(rowBatch1.hasNext());
+
+        RowBatch rowBatch2 = new RowBatch(scanBatchResult, schema, true);
+
+        Assert.assertTrue(rowBatch2.hasNext());
+        List<Object> actualRow01 = rowBatch2.next();
+        Assert.assertEquals(LocalDate.of(2025,1,1), actualRow01.get(0));
+        Assert.assertEquals(localDate, actualRow01.get(1));
+
+        Assert.assertFalse(rowBatch2.hasNext());
+
+    }
+
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
index eaa0ad0..ce36c06 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
@@ -25,6 +25,7 @@ import org.junit.Assert
 import org.junit.jupiter.api.Test
 
 import java.sql.{Date, Timestamp}
+import java.time.LocalDate
 
 class RowConvertorsTest {
 
@@ -103,16 +104,17 @@ class RowConvertorsTest {
 
   @Test def convertValue(): Unit = {
 
-    Assert.assertTrue(RowConvertors.convertValue(1, 
DataTypes.IntegerType).isInstanceOf[Int])
-    Assert.assertTrue(RowConvertors.convertValue(2.3.toFloat, 
DataTypes.FloatType).isInstanceOf[Float])
-    Assert.assertTrue(RowConvertors.convertValue(4.5, 
DataTypes.DoubleType).isInstanceOf[Double])
-    Assert.assertTrue(RowConvertors.convertValue(6.toShort, 
DataTypes.ShortType).isInstanceOf[Short])
-    Assert.assertTrue(RowConvertors.convertValue(7L, 
DataTypes.LongType).isInstanceOf[Long])
-    Assert.assertTrue(RowConvertors.convertValue(Decimal(BigDecimal(8910.11), 
20, 4), DecimalType(20, 4)).isInstanceOf[Decimal])
-    Assert.assertTrue(RowConvertors.convertValue("2024-01-01", 
DataTypes.DateType).isInstanceOf[Int])
-    Assert.assertTrue(RowConvertors.convertValue("2024-01-01 12:34:56", 
DataTypes.TimestampType).isInstanceOf[Long])
-    Assert.assertTrue(RowConvertors.convertValue(Map[String, String]("a" -> 
"1"), MapType(DataTypes.StringType, 
DataTypes.StringType)).isInstanceOf[MapData])
-    Assert.assertTrue(RowConvertors.convertValue("test", 
DataTypes.StringType).isInstanceOf[UTF8String])
+    Assert.assertTrue(RowConvertors.convertValue(1, DataTypes.IntegerType, 
false).isInstanceOf[Int])
+    Assert.assertTrue(RowConvertors.convertValue(2.3.toFloat, 
DataTypes.FloatType, false).isInstanceOf[Float])
+    Assert.assertTrue(RowConvertors.convertValue(4.5, DataTypes.DoubleType, 
false).isInstanceOf[Double])
+    Assert.assertTrue(RowConvertors.convertValue(6.toShort, 
DataTypes.ShortType, false).isInstanceOf[Short])
+    Assert.assertTrue(RowConvertors.convertValue(7L, DataTypes.LongType, 
false).isInstanceOf[Long])
+    Assert.assertTrue(RowConvertors.convertValue(Decimal(BigDecimal(8910.11), 
20, 4), DecimalType(20, 4), false).isInstanceOf[Decimal])
+    Assert.assertTrue(RowConvertors.convertValue(Date.valueOf("2024-01-01"), 
DataTypes.DateType, false).isInstanceOf[Int])
+    Assert.assertTrue(RowConvertors.convertValue(LocalDate.now(), 
DataTypes.DateType, true).isInstanceOf[Int])
+    Assert.assertTrue(RowConvertors.convertValue("2024-01-01 12:34:56", 
DataTypes.TimestampType, false).isInstanceOf[Long])
+    Assert.assertTrue(RowConvertors.convertValue(Map[String, String]("a" -> 
"1"), MapType(DataTypes.StringType, DataTypes.StringType), 
false).isInstanceOf[MapData])
+    Assert.assertTrue(RowConvertors.convertValue("test", DataTypes.StringType, 
false).isInstanceOf[UTF8String])
 
   }
 
diff --git a/spark-doris-connector/spark-doris-connector-spark-2/pom.xml 
b/spark-doris-connector/spark-doris-connector-spark-2/pom.xml
index 254db30..b402084 100644
--- a/spark-doris-connector/spark-doris-connector-spark-2/pom.xml
+++ b/spark-doris-connector/spark-doris-connector-spark-2/pom.xml
@@ -35,10 +35,6 @@
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <spark.version>2.4.8</spark.version>
-        <spark.major.version>2.4</spark.major.version>
-        <scala.version>2.11.12</scala.version>
-        <scala.major.version>2.11</scala.major.version>
     </properties>
 
     <dependencies>
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala
index 34bff24..4afcc01 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala
@@ -22,6 +22,7 @@ import 
org.apache.doris.spark.client.read.ReaderPartitionGenerator
 import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
 import scala.language.implicitConversions
@@ -35,7 +36,9 @@ abstract class AbstractDorisScan(config: DorisConfig, schema: 
StructType) extend
   override def toBatch: Batch = this
 
   override def planInputPartitions(): Array[InputPartition] = {
-    ReaderPartitionGenerator.generatePartitions(config, schema.names, 
compiledFilters(), getLimit).map(toInputPartition)
+    ReaderPartitionGenerator.generatePartitions(config, schema.names, 
compiledFilters(), getLimit,
+      SQLConf.get.datetimeJava8ApiEnabled)
+      .map(toInputPartition)
   }
 
 
@@ -44,7 +47,8 @@ abstract class AbstractDorisScan(config: DorisConfig, schema: 
StructType) extend
   }
 
   private def toInputPartition(rp: DorisReaderPartition): DorisInputPartition =
-    DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend, 
rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan, rp.getReadColumns, 
rp.getFilters, rp.getLimit)
+    DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend, 
rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan,
+      rp.getReadColumns, rp.getFilters, rp.getLimit, 
rp.getDateTimeJava8APIEnabled)
 
   protected def compiledFilters(): Array[String]
 
@@ -52,4 +56,6 @@ abstract class AbstractDorisScan(config: DorisConfig, schema: 
StructType) extend
 
 }
 
-case class DorisInputPartition(database: String, table: String, backend: 
Backend, tablets: Array[Long], opaquedQueryPlan: String, readCols: 
Array[String], predicates: Array[String], limit: Int = -1) extends 
InputPartition
+case class DorisInputPartition(database: String, table: String, backend: 
Backend, tablets: Array[Long],
+                               opaquedQueryPlan: String, readCols: 
Array[String], predicates: Array[String],
+                               limit: Int = -1, datetimeJava8ApiEnabled: 
Boolean) extends InputPartition
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
index 42be1c5..0061fff 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
@@ -22,7 +22,7 @@ import 
org.apache.doris.spark.client.read.{DorisFlightSqlReader, DorisReader, Do
 import org.apache.doris.spark.config.DorisConfig
 import org.apache.doris.spark.util.RowConvertors
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
 import org.apache.spark.sql.types.StructType
 
@@ -34,7 +34,7 @@ class DorisPartitionReader(inputPartition: InputPartition, 
schema: StructType, m
   private implicit def toReaderPartition(inputPart: DorisInputPartition): 
DorisReaderPartition = {
     val tablets = inputPart.tablets.map(java.lang.Long.valueOf)
     new DorisReaderPartition(inputPart.database, inputPart.table, 
inputPart.backend, tablets,
-      inputPart.opaquedQueryPlan, inputPart.readCols, inputPart.predicates, 
inputPart.limit, config)
+      inputPart.opaquedQueryPlan, inputPart.readCols, inputPart.predicates, 
inputPart.limit, config, inputPart.datetimeJava8ApiEnabled)
   }
 
   private lazy val reader: DorisReader = {
@@ -45,19 +45,21 @@ class DorisPartitionReader(inputPartition: InputPartition, 
schema: StructType, m
     }
   }
 
+  private val datetimeJava8ApiEnabled: Boolean = 
inputPartition.asInstanceOf[DorisInputPartition].datetimeJava8ApiEnabled
+
   override def next(): Boolean = reader.hasNext
 
   override def get(): InternalRow = {
     val values = reader.next().asInstanceOf[Array[Any]]
+    val row = new GenericInternalRow(schema.length)
     if (values.nonEmpty) {
-      val row = new SpecificInternalRow(schema.fields.map(_.dataType))
       values.zipWithIndex.foreach {
         case (value, index) =>
           if (value == null) row.setNullAt(index)
-          else row.update(index, RowConvertors.convertValue(value, 
schema.fields(index).dataType))
+          else row.update(index, RowConvertors.convertValue(value, 
schema.fields(index).dataType, datetimeJava8ApiEnabled))
       }
-      row
-    } else null.asInstanceOf[InternalRow]
+    }
+    row
   }
 
   override def close(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to