This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 7b8298a8a4 [Fix][Connector-v2] Add DateMilliConvertor to Convert DateMilliVector into Default Timezone (#8736) 7b8298a8a4 is described below commit 7b8298a8a4c34191844331abd34c9e8b30d5f886 Author: xiaochen <598457...@qq.com> AuthorDate: Tue Feb 18 18:31:42 2025 +0800 [Fix][Connector-v2] Add DateMilliConvertor to Convert DateMilliVector into Default Timezone (#8736) --- .../source/arrow/converter/DateMilliConvertor.java | 44 ++++++++++++++++++++++ .../arrow/reader/ArrowToSeatunnelRowReader.java | 2 +- ...atunnel.common.source.arrow.converter.Converter | 1 + .../arrow/ArrowToSeatunnelRowReaderTest.java | 19 +++------- 4 files changed, 51 insertions(+), 15 deletions(-) diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DateMilliConvertor.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DateMilliConvertor.java new file mode 100644 index 0000000000..c184d6271e --- /dev/null +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/converter/DateMilliConvertor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter; + +import org.apache.seatunnel.shade.org.apache.arrow.vector.DateMilliVector; +import org.apache.seatunnel.shade.org.apache.arrow.vector.types.Types; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; + +public class DateMilliConvertor implements Converter<DateMilliVector> { + @Override + public Object convert(int rowIndex, DateMilliVector fieldVector) { + if (fieldVector == null || fieldVector.isNull(rowIndex)) { + return null; + } + LocalDateTime localDateTime = fieldVector.getObject(rowIndex); + return localDateTime + .atZone(ZoneOffset.UTC) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime(); + } + + @Override + public boolean support(Types.MinorType type) { + return Types.MinorType.DATEMILLI == type; + } +} diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java index 6bd23bad0b..5dbd56a369 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/reader/ArrowToSeatunnelRowReader.java @@ -135,7 +135,7 @@ public class ArrowToSeatunnelRowReader implements AutoCloseable { Integer fieldIndex = fieldIndexMap.get(name); Types.MinorType minorType = fieldVector.getMinorType(); for (int i = 0; i < seatunnelRowBatch.size(); i++) { - // arrow field not in the Seatunnel Sechma field, skip it + // arrow field not in the Seatunnel Schema field, skip it if (fieldIndex != null) { SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[fieldIndex]; Object fieldValue = diff --git a/seatunnel-connectors-v2/connector-common/src/main/resources/META-INF/services/org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter b/seatunnel-connectors-v2/connector-common/src/main/resources/META-INF/services/org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter index dfdefc680e..aadc8ea5f5 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/resources/META-INF/services/org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter +++ b/seatunnel-connectors-v2/connector-common/src/main/resources/META-INF/services/org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.Converter @@ -22,3 +22,4 @@ org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.StructCo org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.TimeStampMicroConverter org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.TimeStampMilliConverter org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.TimeStampNanoConverter +org.apache.seatunnel.connectors.seatunnel.common.source.arrow.converter.DateMilliConvertor diff --git a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java index 8c85a4f107..e84bee4939 100644 --- a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java +++ b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/common/source/arrow/ArrowToSeatunnelRowReaderTest.java @@ -69,7 +69,7 @@ import java.nio.channels.Channels; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.ZoneId; -import java.time.temporal.ChronoUnit; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -84,13 +84,9 @@ public class ArrowToSeatunnelRowReaderTest { private static RootAllocator rootAllocator; private static final List<SeaTunnelDataTypeHolder> seaTunnelDataTypeHolder = new ArrayList<>(); - /** - * LocalDateTime.now() is timestamped with a precision of nanoseconds on linux and milliseconds - * on windows The test case uses TimeStampMicroVector to test the timestamp, thus truncating the - * timestamp accuracy to ChronoUnit.MILLIS - */ private static final LocalDateTime localDateTime = - LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS); + LocalDateTime.parse( + "2025-02-15 02:21:23", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); private static final List<String> stringData = new ArrayList<>(); private static final List<Byte> byteData = new ArrayList<>(); @@ -172,13 +168,8 @@ public class ArrowToSeatunnelRowReaderTest { } // allocate storage vectors.forEach(FieldVector::allocateNew); - // setVectorVaule - long epochMilli = - localDateTime - .truncatedTo(ChronoUnit.MILLIS) - .atZone(zoneId) - .toInstant() - .toEpochMilli(); + long epochMilli = localDateTime.atZone(zoneId).toInstant().toEpochMilli(); + byte byteStart = 'a'; // setVectorValue