This is an automated email from the ASF dual-hosted git repository. lidongdai pushed a commit to branch st-fix-iceberg-timezone in repository https://gitbox.apache.org/repos/asf/seatunnel.git
commit 1292d14728fbb67a2af9a0102309e7a0eb90be0d Author: dailidong <dailidon...@gmail.com> AuthorDate: Fri Dec 6 22:47:04 2024 +0800 ST-2399 fix sink to iceberg timezone --- .../seatunnel/iceberg/data/RowConverter.java | 4 +- .../seatunnel/iceberg/data/RowConverterTest.java | 210 +++++++++++++++++++++ 2 files changed, 213 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java index 11a2079521..88c3e417e8 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverter.java @@ -51,6 +51,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -429,7 +430,8 @@ public class RowConverter { } else if (value instanceof OffsetDateTime) { return (OffsetDateTime) value; } else if (value instanceof LocalDateTime) { - return ((LocalDateTime) value).atOffset(ZoneOffset.UTC); + // Convert to OffsetDateTime using the system(jvm) default timezone + return ((LocalDateTime) value).atZone(ZoneId.systemDefault()).toOffsetDateTime(); } else if (value instanceof Date) { return DateTimeUtil.timestamptzFromMicros(((Date) value).getTime() * 1000); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverterTest.java b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverterTest.java new file mode 100644 index 0000000000..f802f1f8c3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/RowConverterTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iceberg.data; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.types.Types; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneId; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +public class RowConverterTest { + + @Mock private Table table; + + @Mock private SinkConfig config; + + private RowConverter converter; + private Schema schema; + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + + // Create a schema with various field types + schema = + new Schema( + Types.NestedField.required(1, "int_field", Types.IntegerType.get()), + Types.NestedField.required(2, "long_field", Types.LongType.get()), + Types.NestedField.required(3, "float_field", Types.FloatType.get()), + Types.NestedField.required(4, "double_field", Types.DoubleType.get()), + Types.NestedField.required(5, "decimal_field", Types.DecimalType.of(10, 2)), + Types.NestedField.required(6, "boolean_field", Types.BooleanType.get()), + Types.NestedField.required(7, "string_field", Types.StringType.get()), + Types.NestedField.required(8, "uuid_field", Types.UUIDType.get()), + Types.NestedField.required(9, "binary_field", Types.BinaryType.get()), + Types.NestedField.required(10, "date_field", Types.DateType.get()), + Types.NestedField.required(11, "time_field", Types.TimeType.get()), + Types.NestedField.required( + 12, "timestamp_field", Types.TimestampType.withoutZone())); + + when(table.schema()).thenReturn(schema); + when(config.isCaseSensitive()).thenReturn(true); + when(config.isTableSchemaEvolutionEnabled()).thenReturn(false); + + converter = new RowConverter(table, config); + } + + @Test + public void testConvertBasicTypes() { + // Create test data + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] { + "int_field", + "long_field", + "float_field", + "double_field", + "decimal_field", + "boolean_field", + "string_field", + "uuid_field", + "binary_field", + "date_field", + "time_field", + "timestamp_field" + }, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.DOUBLE_TYPE, + new DecimalType(10, 2), + BasicType.BOOLEAN_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + PrimitiveByteArrayType.INSTANCE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_TIME_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE + }); + + UUID testUuid = UUID.randomUUID(); + LocalDateTime now = LocalDateTime.now(); + LocalDate today = LocalDate.now(); + LocalTime time = LocalTime.now(); + byte[] binaryData = "test binary".getBytes(); + + Object[] fields = + new Object[] { + 42, // int + 123456789L, // long + 3.14f, // float + 2.71828, // double + new BigDecimal("123.45"), // decimal + true, // boolean + "test string", // string + testUuid.toString(), // UUID as string + binaryData, // binary + today, // date + time, // time + now // timestamp + }; + + SeaTunnelRow row = new SeaTunnelRow(fields); + + // Convert and verify + org.apache.iceberg.data.Record result = converter.convert(row, rowType); + + assertNotNull(result); + assertEquals(42, result.getField("int_field")); + assertEquals(123456789L, result.getField("long_field")); + assertEquals(3.14f, result.getField("float_field")); + assertEquals(2.71828, result.getField("double_field")); + assertEquals(new BigDecimal("123.45"), result.getField("decimal_field")); + assertEquals(true, result.getField("boolean_field")); + assertEquals("test string", result.getField("string_field")); + assertEquals(testUuid, result.getField("uuid_field")); + assertNotNull(result.getField("binary_field")); + assertEquals(today, result.getField("date_field")); + assertEquals(time, result.getField("time_field")); + assertEquals(now, result.getField("timestamp_field")); + } + + @Test + public void testConvertOffsetDateTime() { + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"timestamp_field"}, + new SeaTunnelDataType[] {LocalTimeType.LOCAL_DATE_TIME_TYPE}); + + ZoneId zoneId = ZoneId.of("UTC"); + LocalDateTime now = LocalDateTime.now(zoneId); + OffsetDateTime offsetNow = now.atZone(zoneId).toOffsetDateTime(); + + SeaTunnelRow row = new SeaTunnelRow(new Object[] {offsetNow}); + + org.apache.iceberg.data.Record result = converter.convert(row, rowType); + assertNotNull(result); + assertEquals( + offsetNow.toLocalDateTime(), ((LocalDateTime) result.getField("timestamp_field"))); + } + + @Test + public void testInvalidTypeConversion() { + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"int_field"}, new SeaTunnelDataType[] {BasicType.INT_TYPE}); + + SeaTunnelRow row = new SeaTunnelRow(new Object[] {"not an integer"}); + + assertThrows(IllegalArgumentException.class, () -> converter.convert(row, rowType)); + } + + @Test + public void testNullValues() { + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"int_field", "string_field"}, + new SeaTunnelDataType[] {BasicType.INT_TYPE, BasicType.STRING_TYPE}); + + SeaTunnelRow row = new SeaTunnelRow(new Object[] {null, null}); + + org.apache.iceberg.data.Record result = converter.convert(row, rowType); + assertNotNull(result); + assertEquals(null, result.getField("int_field")); + assertEquals(null, result.getField("string_field")); + } +}