This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 8ed1134 [fix](connector) fix read date type err (#265) 8ed1134 is described below commit 8ed1134d0dacf655cb488f408494cfbb29376842 Author: gnehil <adamlee...@gmail.com> AuthorDate: Mon Feb 17 15:33:07 2025 +0800 [fix](connector) fix read date type err (#265) --- .../spark/client/entity/DorisReaderPartition.java | 19 +++- .../spark/client/read/AbstractThriftReader.java | 7 +- .../spark/client/read/DorisFlightSqlReader.java | 4 +- .../client/read/ReaderPartitionGenerator.java | 14 +-- .../apache/doris/spark/client/read/RowBatch.java | 20 +++- .../apache/doris/spark/rdd/AbstractDorisRDD.scala | 4 +- .../apache/doris/spark/util/RowConvertors.scala | 5 +- .../doris/spark/client/read/RowBatchTest.java | 104 ++++++++++++++++++--- .../doris/spark/util/RowConvertorsTest.scala | 22 +++-- .../spark-doris-connector-spark-2/pom.xml | 4 - .../doris/spark/read/AbstractDorisScan.scala | 12 ++- .../doris/spark/read/DorisPartitionReader.scala | 14 +-- 12 files changed, 176 insertions(+), 53 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java index aa75319..608f019 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java @@ -34,8 +34,10 @@ public class DorisReaderPartition implements Serializable { private final String[] filters; private final Integer limit; private final DorisConfig config; + private final Boolean datetimeJava8ApiEnabled; - public DorisReaderPartition(String database, String table, Backend backend, Long[] tablets, String opaquedQueryPlan, String[] readColumns, String[] filters, DorisConfig config) { + public DorisReaderPartition(String database, String table, Backend backend, Long[] tablets, String opaquedQueryPlan, + String[] readColumns, String[] filters, DorisConfig config, Boolean datetimeJava8ApiEnabled) { this.database = database; this.table = table; this.backend = backend; @@ -45,9 +47,11 @@ public class DorisReaderPartition implements Serializable { this.filters = filters; this.limit = -1; this.config = config; + this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled; } - public DorisReaderPartition(String database, String table, Backend backend, Long[] tablets, String opaquedQueryPlan, String[] readColumns, String[] filters, Integer limit, DorisConfig config) { + public DorisReaderPartition(String database, String table, Backend backend, Long[] tablets, String opaquedQueryPlan, + String[] readColumns, String[] filters, Integer limit, DorisConfig config, Boolean datetimeJava8ApiEnabled) { this.database = database; this.table = table; this.backend = backend; @@ -57,6 +61,7 @@ public class DorisReaderPartition implements Serializable { this.filters = filters; this.limit = limit; this.config = config; + this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled; } // Getters and Setters @@ -96,6 +101,10 @@ public class DorisReaderPartition implements Serializable { return limit; } + public Boolean getDateTimeJava8APIEnabled() { + return datetimeJava8ApiEnabled; + } + @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; @@ -108,11 +117,13 @@ public class DorisReaderPartition implements Serializable { && Objects.deepEquals(readColumns, that.readColumns) && Objects.deepEquals(filters, that.filters) && Objects.equals(limit, that.limit) - && Objects.equals(config, that.config); + && Objects.equals(config, that.config) + && Objects.equals(datetimeJava8ApiEnabled, that.datetimeJava8ApiEnabled); } @Override public int hashCode() { - return Objects.hash(database, table, backend, Arrays.hashCode(tablets), opaquedQueryPlan, Arrays.hashCode(readColumns), Arrays.hashCode(filters), limit, config); + return Objects.hash(database, table, backend, Arrays.hashCode(tablets), opaquedQueryPlan, + Arrays.hashCode(readColumns), Arrays.hashCode(filters), limit, config, datetimeJava8ApiEnabled); } } \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java index c533de8..7fdb1cf 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java @@ -73,6 +73,8 @@ public abstract class AbstractThriftReader extends DorisReader { private int readCount = 0; + private final Boolean datetimeJava8ApiEnabled; + protected AbstractThriftReader(DorisReaderPartition partition) throws Exception { super(partition); this.frontend = new DorisFrontendClient(config); @@ -108,6 +110,7 @@ public abstract class AbstractThriftReader extends DorisReader { this.rowBatchQueue = null; this.asyncThread = null; } + this.datetimeJava8ApiEnabled = false; } private void runAsync() throws DorisException, InterruptedException { @@ -124,7 +127,7 @@ public abstract class AbstractThriftReader extends DorisReader { }); endOfStream.set(nextResult.isEos()); if (!endOfStream.get()) { - rowBatch = new RowBatch(nextResult, dorisSchema); + rowBatch = new RowBatch(nextResult, dorisSchema, datetimeJava8ApiEnabled); offset += rowBatch.getReadRowCount(); rowBatch.close(); rowBatchQueue.put(rowBatch); @@ -178,7 +181,7 @@ public abstract class AbstractThriftReader extends DorisReader { }); endOfStream.set(nextResult.isEos()); if (!endOfStream.get()) { - rowBatch = new RowBatch(nextResult, dorisSchema); + rowBatch = new RowBatch(nextResult, dorisSchema, datetimeJava8ApiEnabled); } } hasNext = !endOfStream.get(); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java index faa462b..4623d65 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java @@ -59,6 +59,7 @@ public class DorisFlightSqlReader extends DorisReader { private final Schema schema; private AdbcConnection connection; private final ArrowReader arrowReader; + private final Boolean datetimeJava8ApiEnabled; public DorisFlightSqlReader(DorisReaderPartition partition) throws Exception { super(partition); @@ -74,6 +75,7 @@ public class DorisFlightSqlReader extends DorisReader { } this.schema = processDorisSchema(partition); this.arrowReader = executeQuery(); + this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled(); } @Override @@ -85,7 +87,7 @@ public class DorisFlightSqlReader extends DorisReader { throw new DorisException(e); } if (!endOfStream.get()) { - rowBatch = new RowBatch(arrowReader, schema); + rowBatch = new RowBatch(arrowReader, schema, datetimeJava8ApiEnabled); } } return !endOfStream.get(); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java index 4aa660a..002b58b 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java @@ -45,7 +45,7 @@ public class ReaderPartitionGenerator { /* * for spark 2 */ - public static DorisReaderPartition[] generatePartitions(DorisConfig config) throws Exception { + public static DorisReaderPartition[] generatePartitions(DorisConfig config, Boolean datetimeJava8ApiEnabled) throws Exception { String[] originReadCols; if (config.contains(DorisOptions.DORIS_READ_FIELDS) && !config.getValue(DorisOptions.DORIS_READ_FIELDS).equals("*")) { originReadCols = Arrays.stream(config.getValue(DorisOptions.DORIS_READ_FIELDS).split(",")) @@ -55,14 +55,15 @@ public class ReaderPartitionGenerator { } String[] filters = config.contains(DorisOptions.DORIS_FILTER_QUERY) ? config.getValue(DorisOptions.DORIS_FILTER_QUERY).split("\\.") : new String[0]; - return generatePartitions(config, originReadCols, filters, -1); + return generatePartitions(config, originReadCols, filters, -1, datetimeJava8ApiEnabled); } /* * for spark 3 */ public static DorisReaderPartition[] generatePartitions(DorisConfig config, - String[] fields, String[] filters, Integer limit) throws Exception { + String[] fields, String[] filters, Integer limit, + Boolean datetimeJava8ApiEnabled) throws Exception { DorisFrontendClient frontend = new DorisFrontendClient(config); String fullTableName = config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER); String[] tableParts = fullTableName.split("\\."); @@ -81,7 +82,7 @@ public class ReaderPartitionGenerator { Map<String, List<Long>> beToTablets = mappingBeToTablets(queryPlan); int maxTabletSize = config.getValue(DorisOptions.DORIS_TABLET_SIZE); return distributeTabletsToPartitions(db, table, beToTablets, queryPlan.getOpaqued_query_plan(), maxTabletSize, - finalReadColumns, filters, config, limit); + finalReadColumns, filters, config, limit, datetimeJava8ApiEnabled); } @VisibleForTesting @@ -112,7 +113,8 @@ public class ReaderPartitionGenerator { Map<String, List<Long>> beToTablets, String opaquedQueryPlan, int maxTabletSize, String[] readColumns, String[] predicates, - DorisConfig config, Integer limit) { + DorisConfig config, Integer limit, + Boolean datetimeJava8ApiEnabled) { List<DorisReaderPartition> partitions = new ArrayList<>(); beToTablets.forEach((backendStr, tabletIds) -> { List<Long> distinctTablets = new ArrayList<>(new HashSet<>(tabletIds)); @@ -121,7 +123,7 @@ public class ReaderPartitionGenerator { Long[] tablets = distinctTablets.subList(offset, Math.min(offset + maxTabletSize, distinctTablets.size())).toArray(new Long[0]); offset += maxTabletSize; partitions.add(new DorisReaderPartition(database, table, new Backend(backendStr), tablets, - opaquedQueryPlan, readColumns, predicates, limit, config)); + opaquedQueryPlan, readColumns, predicates, limit, config, datetimeJava8ApiEnabled)); } }); return partitions.toArray(new DorisReaderPartition[0]); diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java index 840825c..6b06b37 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java @@ -103,11 +103,14 @@ public class RowBatch implements Serializable { private int readRowCount = 0; private List<FieldVector> fieldVectors; - public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException { + private final Boolean datetimeJava8ApiEnabled; + + public RowBatch(TScanBatchResult nextResult, Schema schema, Boolean datetimeJava8ApiEnabled) throws DorisException { this.rootAllocator = new RootAllocator(Integer.MAX_VALUE); this.arrowReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator); this.schema = schema; + this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled; try { VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); @@ -123,10 +126,11 @@ public class RowBatch implements Serializable { } - public RowBatch(ArrowReader reader, Schema schema) throws DorisException { + public RowBatch(ArrowReader reader, Schema schema, Boolean datetimeJava8ApiEnabled) throws DorisException { this.arrowReader = reader; this.schema = schema; + this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled; try { VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); @@ -368,7 +372,11 @@ public class RowBatch implements Serializable { } String stringValue = new String(date.get(rowIndex)); LocalDate localDate = LocalDate.parse(stringValue); - addValueToRow(rowIndex, Date.valueOf(localDate)); + if (datetimeJava8ApiEnabled) { + addValueToRow(rowIndex, localDate); + } else { + addValueToRow(rowIndex, Date.valueOf(localDate)); + } } } else { DateDayVector date = (DateDayVector) curFieldVector; @@ -378,7 +386,11 @@ public class RowBatch implements Serializable { continue; } LocalDate localDate = LocalDate.ofEpochDay(date.get(rowIndex)); - addValueToRow(rowIndex, Date.valueOf(localDate)); + if (datetimeJava8ApiEnabled) { + addValueToRow(rowIndex, localDate); + } else { + addValueToRow(rowIndex, Date.valueOf(localDate)); + } } } break; diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala index bb9008a..50f6474 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala @@ -51,7 +51,9 @@ protected[spark] abstract class AbstractDorisRDD[T: ClassTag]( */ @transient private[spark] lazy val dorisCfg = DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava, false) - @transient private[spark] lazy val dorisPartitions = ReaderPartitionGenerator.generatePartitions(dorisCfg) + @transient private[spark] lazy val dateTimeJava8ApiEnabled = sc.getConf.get("spark.sql.datetime.java8API.enabled", "false").toBoolean + + @transient private[spark] lazy val dorisPartitions = ReaderPartitionGenerator.generatePartitions(dorisCfg, dateTimeJava8ApiEnabled) } private[spark] class DorisPartition(rddId: Int, idx: Int, val dorisPartition: DorisReaderPartition) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala index aefb013..dd8a7d5 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala @@ -23,10 +23,12 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import java.sql.{Date, Timestamp} +import java.time.LocalDate import scala.collection.mutable object RowConvertors { @@ -105,10 +107,11 @@ object RowConvertors { } } - def convertValue(v: Any, dataType: DataType): Any = { + def convertValue(v: Any, dataType: DataType, datetimeJava8ApiEnabled: Boolean): Any = { dataType match { case StringType => UTF8String.fromString(v.asInstanceOf[String]) case TimestampType => DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(v.asInstanceOf[String])) + case DateType if datetimeJava8ApiEnabled => v.asInstanceOf[LocalDate].toEpochDay.toInt case DateType => DateTimeUtils.fromJavaDate(v.asInstanceOf[Date]) case _: MapType => val map = v.asInstanceOf[Map[String, String]] diff --git a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java index 134595f..7b0cca7 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java +++ b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java @@ -56,6 +56,8 @@ import org.apache.doris.sdk.thrift.TStatusCode; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.rest.RestService; import org.apache.doris.spark.rest.models.Schema; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.internal.SQLConf$; import org.apache.spark.sql.types.Decimal; import static org.hamcrest.core.StringStartsWith.startsWith; import org.junit.Assert; @@ -73,6 +75,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.sql.Date; +import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Arrays; @@ -261,7 +264,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); List<Object> expectedRow1 = Arrays.asList( Boolean.TRUE, @@ -375,7 +378,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); @@ -439,7 +442,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); @@ -527,7 +530,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); @@ -602,7 +605,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); @@ -683,7 +686,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k1", "0")).asScala(), rowBatch.next().get(0)); @@ -749,7 +752,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); Assert.assertEquals("{\"a\":\"a1\",\"b\":1}", rowBatch.next().get(0)); @@ -827,7 +830,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); @@ -902,7 +905,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); @@ -995,7 +998,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); assertEquals("0.0.0.0", actualRow0.get(0)); @@ -1104,7 +1107,7 @@ public class RowBatchTest { Schema schema = RestService.parseSchema(schemaStr, logger); - RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false); Assert.assertTrue(rowBatch.hasNext()); List<Object> actualRow0 = rowBatch.next(); assertEquals("::", actualRow0.get(0)); @@ -1163,4 +1166,83 @@ public class RowBatchTest { rowBatch.next(); } + @Test + public void testDatetimeJava8API() throws DorisException, IOException { + + ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + childrenBuilder.add(new Field("k0", FieldType.nullable(new ArrowType.Utf8()), null)); + childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Date(DateUnit.DAY)), null)); + + VectorSchemaRoot root = VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( + root, + new DictionaryProvider.MapDictionaryProvider(), + outputStream); + + arrowStreamWriter.start(); + root.setRowCount(1); + + FieldVector vector = root.getVector("k0"); + VarCharVector dateVector = (VarCharVector) vector; + dateVector.setInitialCapacity(1); + dateVector.allocateNew(); + dateVector.setIndexDefined(0); + dateVector.setValueLengthSafe(0, 20); + dateVector.setSafe(0, "2025-01-01".getBytes()); + vector.setValueCount(1); + + LocalDate localDate = LocalDate.of(2025, 2, 1); + long date = localDate.toEpochDay(); + + vector = root.getVector("k1"); + DateDayVector date2Vector = (DateDayVector) vector; + date2Vector.setInitialCapacity(1); + date2Vector.allocateNew(); + date2Vector.setIndexDefined(0); + date2Vector.setSafe(0, (int) date); + vector.setValueCount(1); + + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + + String schemaStr = "{\"properties\":[" + + "{\"type\":\"DATE\",\"name\":\"k0\",\"comment\":\"\"}, " + + "{\"type\":\"DATEV2\",\"name\":\"k1\",\"comment\":\"\"}" + + "], \"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch1 = new RowBatch(scanBatchResult, schema, false); + + Assert.assertTrue(rowBatch1.hasNext()); + List<Object> actualRow0 = rowBatch1.next(); + Assert.assertEquals(Date.valueOf("2025-01-01"), actualRow0.get(0)); + Assert.assertEquals(Date.valueOf("2025-02-01"), actualRow0.get(1)); + + Assert.assertFalse(rowBatch1.hasNext()); + + RowBatch rowBatch2 = new RowBatch(scanBatchResult, schema, true); + + Assert.assertTrue(rowBatch2.hasNext()); + List<Object> actualRow01 = rowBatch2.next(); + Assert.assertEquals(LocalDate.of(2025,1,1), actualRow01.get(0)); + Assert.assertEquals(localDate, actualRow01.get(1)); + + Assert.assertFalse(rowBatch2.hasNext()); + + } + } \ No newline at end of file diff --git a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala index eaa0ad0..ce36c06 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala +++ b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala @@ -25,6 +25,7 @@ import org.junit.Assert import org.junit.jupiter.api.Test import java.sql.{Date, Timestamp} +import java.time.LocalDate class RowConvertorsTest { @@ -103,16 +104,17 @@ class RowConvertorsTest { @Test def convertValue(): Unit = { - Assert.assertTrue(RowConvertors.convertValue(1, DataTypes.IntegerType).isInstanceOf[Int]) - Assert.assertTrue(RowConvertors.convertValue(2.3.toFloat, DataTypes.FloatType).isInstanceOf[Float]) - Assert.assertTrue(RowConvertors.convertValue(4.5, DataTypes.DoubleType).isInstanceOf[Double]) - Assert.assertTrue(RowConvertors.convertValue(6.toShort, DataTypes.ShortType).isInstanceOf[Short]) - Assert.assertTrue(RowConvertors.convertValue(7L, DataTypes.LongType).isInstanceOf[Long]) - Assert.assertTrue(RowConvertors.convertValue(Decimal(BigDecimal(8910.11), 20, 4), DecimalType(20, 4)).isInstanceOf[Decimal]) - Assert.assertTrue(RowConvertors.convertValue("2024-01-01", DataTypes.DateType).isInstanceOf[Int]) - Assert.assertTrue(RowConvertors.convertValue("2024-01-01 12:34:56", DataTypes.TimestampType).isInstanceOf[Long]) - Assert.assertTrue(RowConvertors.convertValue(Map[String, String]("a" -> "1"), MapType(DataTypes.StringType, DataTypes.StringType)).isInstanceOf[MapData]) - Assert.assertTrue(RowConvertors.convertValue("test", DataTypes.StringType).isInstanceOf[UTF8String]) + Assert.assertTrue(RowConvertors.convertValue(1, DataTypes.IntegerType, false).isInstanceOf[Int]) + Assert.assertTrue(RowConvertors.convertValue(2.3.toFloat, DataTypes.FloatType, false).isInstanceOf[Float]) + Assert.assertTrue(RowConvertors.convertValue(4.5, DataTypes.DoubleType, false).isInstanceOf[Double]) + Assert.assertTrue(RowConvertors.convertValue(6.toShort, DataTypes.ShortType, false).isInstanceOf[Short]) + Assert.assertTrue(RowConvertors.convertValue(7L, DataTypes.LongType, false).isInstanceOf[Long]) + Assert.assertTrue(RowConvertors.convertValue(Decimal(BigDecimal(8910.11), 20, 4), DecimalType(20, 4), false).isInstanceOf[Decimal]) + Assert.assertTrue(RowConvertors.convertValue(Date.valueOf("2024-01-01"), DataTypes.DateType, false).isInstanceOf[Int]) + Assert.assertTrue(RowConvertors.convertValue(LocalDate.now(), DataTypes.DateType, true).isInstanceOf[Int]) + Assert.assertTrue(RowConvertors.convertValue("2024-01-01 12:34:56", DataTypes.TimestampType, false).isInstanceOf[Long]) + Assert.assertTrue(RowConvertors.convertValue(Map[String, String]("a" -> "1"), MapType(DataTypes.StringType, DataTypes.StringType), false).isInstanceOf[MapData]) + Assert.assertTrue(RowConvertors.convertValue("test", DataTypes.StringType, false).isInstanceOf[UTF8String]) } diff --git a/spark-doris-connector/spark-doris-connector-spark-2/pom.xml b/spark-doris-connector/spark-doris-connector-spark-2/pom.xml index 254db30..b402084 100644 --- a/spark-doris-connector/spark-doris-connector-spark-2/pom.xml +++ b/spark-doris-connector/spark-doris-connector-spark-2/pom.xml @@ -35,10 +35,6 @@ <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <spark.version>2.4.8</spark.version> - <spark.major.version>2.4</spark.major.version> - <scala.version>2.11.12</scala.version> - <scala.major.version>2.11</scala.major.version> </properties> <dependencies> diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala index 34bff24..4afcc01 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala @@ -22,6 +22,7 @@ import org.apache.doris.spark.client.read.ReaderPartitionGenerator import org.apache.doris.spark.config.{DorisConfig, DorisOptions} import org.apache.spark.internal.Logging import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import scala.language.implicitConversions @@ -35,7 +36,9 @@ abstract class AbstractDorisScan(config: DorisConfig, schema: StructType) extend override def toBatch: Batch = this override def planInputPartitions(): Array[InputPartition] = { - ReaderPartitionGenerator.generatePartitions(config, schema.names, compiledFilters(), getLimit).map(toInputPartition) + ReaderPartitionGenerator.generatePartitions(config, schema.names, compiledFilters(), getLimit, + SQLConf.get.datetimeJava8ApiEnabled) + .map(toInputPartition) } @@ -44,7 +47,8 @@ abstract class AbstractDorisScan(config: DorisConfig, schema: StructType) extend } private def toInputPartition(rp: DorisReaderPartition): DorisInputPartition = - DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend, rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan, rp.getReadColumns, rp.getFilters, rp.getLimit) + DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend, rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan, + rp.getReadColumns, rp.getFilters, rp.getLimit, rp.getDateTimeJava8APIEnabled) protected def compiledFilters(): Array[String] @@ -52,4 +56,6 @@ abstract class AbstractDorisScan(config: DorisConfig, schema: StructType) extend } -case class DorisInputPartition(database: String, table: String, backend: Backend, tablets: Array[Long], opaquedQueryPlan: String, readCols: Array[String], predicates: Array[String], limit: Int = -1) extends InputPartition +case class DorisInputPartition(database: String, table: String, backend: Backend, tablets: Array[Long], + opaquedQueryPlan: String, readCols: Array[String], predicates: Array[String], + limit: Int = -1, datetimeJava8ApiEnabled: Boolean) extends InputPartition diff --git a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala index 42be1c5..0061fff 100644 --- a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala +++ b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala @@ -22,7 +22,7 @@ import org.apache.doris.spark.client.read.{DorisFlightSqlReader, DorisReader, Do import org.apache.doris.spark.config.DorisConfig import org.apache.doris.spark.util.RowConvertors import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} import org.apache.spark.sql.types.StructType @@ -34,7 +34,7 @@ class DorisPartitionReader(inputPartition: InputPartition, schema: StructType, m private implicit def toReaderPartition(inputPart: DorisInputPartition): DorisReaderPartition = { val tablets = inputPart.tablets.map(java.lang.Long.valueOf) new DorisReaderPartition(inputPart.database, inputPart.table, inputPart.backend, tablets, - inputPart.opaquedQueryPlan, inputPart.readCols, inputPart.predicates, inputPart.limit, config) + inputPart.opaquedQueryPlan, inputPart.readCols, inputPart.predicates, inputPart.limit, config, inputPart.datetimeJava8ApiEnabled) } private lazy val reader: DorisReader = { @@ -45,19 +45,21 @@ class DorisPartitionReader(inputPartition: InputPartition, schema: StructType, m } } + private val datetimeJava8ApiEnabled: Boolean = inputPartition.asInstanceOf[DorisInputPartition].datetimeJava8ApiEnabled + override def next(): Boolean = reader.hasNext override def get(): InternalRow = { val values = reader.next().asInstanceOf[Array[Any]] + val row = new GenericInternalRow(schema.length) if (values.nonEmpty) { - val row = new SpecificInternalRow(schema.fields.map(_.dataType)) values.zipWithIndex.foreach { case (value, index) => if (value == null) row.setNullAt(index) - else row.update(index, RowConvertors.convertValue(value, schema.fields(index).dataType)) + else row.update(index, RowConvertors.convertValue(value, schema.fields(index).dataType, datetimeJava8ApiEnabled)) } - row - } else null.asInstanceOf[InternalRow] + } + row } override def close(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org