This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3697916  [INLONG-3486][Sort] Data with Timestamp/Date type are written 
wrongly when using parquet format
3697916 is described below

commit 3697916d8fc52f147c5a0667e2bbe0d0c6912417
Author: TianqiWan <[email protected]>
AuthorDate: Thu Mar 31 10:57:55 2022 +0800

    [INLONG-3486][Sort] Data with Timestamp/Date type are written wrongly when 
using parquet format
---
 .../hive/formats/parquet/ParquetRowWriter.java     | 41 +++++++++++++++++++---
 .../formats/parquet/ParquetSchemaConverter.java    |  3 +-
 .../formats/parquet/ParquetBulkWriterTest.java     |  6 ++--
 3 files changed, 41 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
index b36ada8..08c4da2 100644
--- 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
+++ 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetRowWriter.java
@@ -18,10 +18,13 @@
 
 package org.apache.inlong.sort.flink.hive.formats.parquet;
 
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Date;
+import java.sql.Time;
 import java.sql.Timestamp;
-import java.util.Date;
 import java.util.Map;
-
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
@@ -35,6 +38,10 @@ import org.apache.parquet.schema.Type;
 /** Writes a record to the Parquet API with the expected schema in order to be 
written to a file. */
 public class ParquetRowWriter {
 
+    public static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
+    public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
+    public static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+
     static final String ARRAY_FIELD_NAME = "list";
 
     static final String MAP_ENTITY_FIELD_NAME = "key_value";
@@ -106,10 +113,12 @@ public class ParquetRowWriter {
                 case DOUBLE:
                     return new DoubleWriter();
                 case DATE:
+                    return (row, ordinal) -> recordConsumer.addInteger(
+                            (int) ((Date) 
row.getField(ordinal)).toLocalDate().toEpochDay());
                 case TIME_WITHOUT_TIME_ZONE:
-                    return (row, ordinal) -> recordConsumer.addInteger((int) 
((Date) row.getField(ordinal)).getTime());
+                    return (row, ordinal) -> recordConsumer.addInteger((int) 
((Time) row.getField(ordinal)).getTime());
                 case TIMESTAMP_WITHOUT_TIME_ZONE:
-                    return (row, ordinal) -> 
recordConsumer.addLong(((Timestamp) row.getField(ordinal)).getTime());
+                    return new TimestampWriter();
                 default:
                     throw new UnsupportedOperationException("Unsupported type: 
" + type);
             }
@@ -203,6 +212,30 @@ public class ParquetRowWriter {
         }
     }
 
+    private class TimestampWriter implements FieldWriter {
+
+        @Override
+        public void write(Row row, int ordinal) {
+            recordConsumer.addBinary(timestampToInt96((Timestamp) 
row.getField(ordinal)));
+        }
+    }
+
+    public static Binary timestampToInt96(Timestamp timestamp) {
+        int julianDay;
+        long nanosOfDay;
+
+        long mills = timestamp.getTime();
+        julianDay = (int) ((mills / MILLIS_IN_DAY) + JULIAN_EPOCH_OFFSET_DAYS);
+        nanosOfDay = ((mills % MILLIS_IN_DAY) / 1000) * NANOS_PER_SECOND + 
timestamp.getNanos();
+
+        ByteBuffer buf = ByteBuffer.allocate(12);
+        buf.order(ByteOrder.LITTLE_ENDIAN);
+        buf.putLong(nanosOfDay);
+        buf.putInt(julianDay);
+        buf.flip();
+        return Binary.fromConstantByteBuffer(buf);
+    }
+
     private class ArrayWriter implements FieldWriter {
 
         private final LogicalType elementTypeFlink;
diff --git 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
index 4b7eaf5..54866fc 100644
--- 
a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
+++ 
b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetSchemaConverter.java
@@ -601,8 +601,7 @@ public class ParquetSchemaConverter {
                         .as(OriginalType.TIME_MILLIS)
                         .named(name);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, 
repetition)
-                        .as(OriginalType.TIMESTAMP_MILLIS)
+                return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, 
repetition)
                         .named(name);
             case ARRAY:
                 return Types.list(repetition)
diff --git 
a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
 
b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
index 8f82cb3..311e7f8 100644
--- 
a/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
+++ 
b/inlong-sort/sort-connectors/src/test/java/org/apache/inlong/sort/flink/hive/formats/parquet/ParquetBulkWriterTest.java
@@ -94,7 +94,7 @@ public class ParquetBulkWriterTest {
                 (float) 1.1,
                 1.11,
                 new BigDecimal("123456789123456789"),
-                new Date(0),
+                new Date(24 * 60 * 60 * 1000),
                 new Time(0),
                 new Timestamp(0),
                 testArray,
@@ -119,9 +119,9 @@ public class ParquetBulkWriterTest {
         assertEquals(1.1, line.getFloat(6, 0), 0.01);
         assertEquals(1.11, line.getDouble(7, 0), 0.001);
         assertEquals("123456789123456789", line.getString(8, 0));
-        assertEquals(0, line.getInteger(9, 0));
+        assertEquals(1, line.getInteger(9, 0));
         assertEquals(0, line.getInteger(10, 0));
-        assertEquals(0, line.getLong(11, 0));
+        assertEquals(ParquetRowWriter.timestampToInt96(new Timestamp(0)), 
line.getInt96(11, 0));
 
         Group f13 = line.getGroup("f13", 0);
         assertEquals(1, f13.getGroup(ARRAY_FIELD_NAME, 0).getInteger(0, 0));

Reply via email to